| """ |
| DGA-FANCI: Random Forest with 27 FANCI features for DGA detection. |
| Trained on 54 DGA families. |
| File size: ~1 GB (Random Forest with many estimators). |
| |
| IMPORTANT: FANCIFeatureExtractor and FANCIRandomForest must be defined here |
| so that joblib can deserialize the saved model object. |
| """ |
|
|
| import re |
| import math |
| from collections import Counter |
| import numpy as np |
| import pandas as pd |
| import tldextract |
|
|
|
|
| class FANCIFeatureExtractor: |
| def __init__(self): |
| self.valid_tlds = {'com', 'org', 'net', 'edu', 'gov', 'mil', 'int', 'arpa', |
| 'de', 'uk', 'fr', 'it', 'es', 'ru', 'cn', 'jp', 'br', 'au', 'ca'} |
| self.vowels = set('aeiouAEIOU') |
| self.consonants = set('bcdfghjklmnpqrstvwxyzBCDFGHJKLMNPQRSTVWXYZ') |
|
|
| def extract_e2ld(self, domain): |
| try: |
| extracted = tldextract.extract(domain) |
| if extracted.suffix in ['dyndns.org', 'ddns.net']: |
| return f"{extracted.subdomain}.{extracted.domain}" if extracted.subdomain else extracted.domain |
| return extracted.domain |
| except: |
| return domain |
|
|
| def get_dot_free_public_suffix_free(self, domain): |
| e2ld = self.extract_e2ld(domain) |
| return re.sub(r'[^a-zA-Z0-9]', '', e2ld) |
|
|
| def _is_prefix_repetition(self, domain): |
| parts = domain.split('.') |
| if len(parts) < 2: |
| return 0 |
| base = parts[0] |
| for i in range(1, len(parts) - 1): |
| if not parts[i].startswith(base): |
| return 0 |
| return 1 |
|
|
| def _is_hex(self, s): |
| try: |
| int(s, 16) |
| return len(s) > 0 |
| except: |
| return False |
|
|
| def _contains_ip(self, domain): |
| return bool(re.search(r'\b(?:\d{1,3}\.){3}\d{1,3}\b', domain)) |
|
|
| def _consecutive_ratio(self, text, char_set): |
| if not text: |
| return 0 |
| res, cur = [], 0 |
| for c in text: |
| if c in char_set: |
| cur += 1 |
| else: |
| if cur >= 2: |
| res.append(cur) |
| cur = 0 |
| if cur >= 2: |
| res.append(cur) |
| return sum(res) / max(1, len(text)) |
|
|
| def _ngram_features(self, text): |
| if not text: |
| return {k: 0 for k in ['ngram_mean', 'ngram_std', 'ngram_min', 'ngram_max', |
| 'ngram_median', 'ngram_q25', 'ngram_q75']} |
| freqs = list(Counter(text).values()) |
| return { |
| 'ngram_mean': np.mean(freqs), |
| 'ngram_std': np.std(freqs), |
| 'ngram_min': np.min(freqs), |
| 'ngram_max': np.max(freqs), |
| 'ngram_median': np.median(freqs), |
| 'ngram_q25': np.percentile(freqs, 25), |
| 'ngram_q75': np.percentile(freqs, 75), |
| } |
|
|
| def _calculate_entropy(self, text): |
| if not text: |
| return 0 |
| probs = [c / len(text) for c in Counter(text).values()] |
| return -sum(p * math.log2(p) for p in probs) |
|
|
| def extract_features(self, domain): |
| domain = domain.lower().strip() |
| e2ld = self.extract_e2ld(domain) |
| parts = domain.split('.') |
| ddsf = self.get_dot_free_public_suffix_free(domain) |
| f = {} |
| f['domain_length'] = len(domain) |
| f['num_subdomains'] = max(0, len(parts) - 2) |
| f['subdomain_length_mean'] = np.mean([len(p) for p in parts[:-1]]) if len(parts) > 1 else len(e2ld) |
| f['has_www_prefix'] = 1 if domain.startswith('www.') else 0 |
| f['has_valid_tld'] = 1 if parts[-1] in self.valid_tlds else 0 |
| f['contains_single_char_subdomain'] = 1 if any(len(p) == 1 for p in parts[:-1]) else 0 |
| f['is_exclusive_prefix_repetition'] = self._is_prefix_repetition(domain) |
| f['contains_tld_as_subdomain'] = 1 if any(p in self.valid_tlds for p in parts[:-1]) else 0 |
| total_sub = max(1, len(parts) - 1) |
| f['ratio_digit_exclusive_subdomains'] = sum(1 for p in parts[:-1] if p.isdigit()) / total_sub |
| f['ratio_hex_exclusive_subdomains'] = sum(1 for p in parts[:-1] if self._is_hex(p)) / total_sub |
| f['underscore_ratio'] = ddsf.count('_') / max(1, len(ddsf)) |
| f['contains_ip_address'] = 1 if self._contains_ip(domain) else 0 |
| f['contains_digits'] = 1 if any(c.isdigit() for c in ddsf) else 0 |
| f['vowel_ratio'] = sum(1 for c in ddsf if c in self.vowels) / max(1, len(ddsf)) |
| f['digit_ratio'] = sum(1 for c in ddsf if c.isdigit()) / max(1, len(ddsf)) |
| f['alphabet_cardinality'] = len(set(ddsf)) |
| counts = Counter(ddsf) |
| f['ratio_repeated_characters'] = sum(1 for c in counts.values() if c > 1) / max(1, len(set(ddsf))) |
| f['ratio_consecutive_consonants'] = self._consecutive_ratio(ddsf, self.consonants) |
| f['ratio_consecutive_digits'] = self._consecutive_ratio(ddsf, set('0123456789')) |
| f.update(self._ngram_features(ddsf)) |
| f['entropy'] = self._calculate_entropy(ddsf) |
| return f |
|
|
|
|
| class FANCIRandomForest: |
| def __init__(self): |
| pass |
|
|
| def extract_features_from_dataframe(self, df): |
| feature_list = [self.feature_extractor.extract_features(d) for d in df['domain']] |
| feature_df = pd.DataFrame(feature_list) |
| return feature_df.reindex(columns=self.feature_names, fill_value=0) |
|
|
| def predict(self, domains): |
| temp_df = pd.DataFrame({'domain': domains}) |
| X = self.extract_features_from_dataframe(temp_df) |
| preds = self.rf.predict(X) |
| probs = self.rf.predict_proba(X) |
| return [ |
| {'domain': d, 'prediction': 'DGA' if preds[i] == 1 else 'Benign', 'dga_probability': probs[i][1]} |
| for i, d in enumerate(domains) |
| ] |
|
|
|
|
| def load_model(model_path: str): |
| """Load FANCI Random Forest from joblib file. |
| |
| The model was saved from a Colab notebook (__main__ context), so joblib |
| looks for FANCIFeatureExtractor and FANCIRandomForest in __main__. |
| We inject both classes there before calling joblib.load(). |
| """ |
| import sys |
| import joblib |
|
|
| main = sys.modules['__main__'] |
| main.FANCIFeatureExtractor = FANCIFeatureExtractor |
| main.FANCIRandomForest = FANCIRandomForest |
|
|
| return joblib.load(model_path) |
|
|
|
|
| def predict(model, domains): |
| """ |
| Predict DGA vs legit for a list of domain strings. |
| model: FANCIRandomForest instance loaded from joblib. |
| Returns list of dicts: [{"domain": ..., "label": "dga"/"legit", "score": float}] |
| """ |
| if isinstance(domains, str): |
| domains = [domains] |
|
|
| results = model.predict(domains) |
| return [ |
| { |
| "domain": r["domain"], |
| "label": "dga" if r["prediction"] == "DGA" else "legit", |
| "score": round(float(r["dga_probability"]), 4), |
| } |
| for r in results |
| ] |
|
|