dga-fanci / model.py
Reynier's picture
Fix: inject FANCIFeatureExtractor and FANCIRandomForest into __main__ before joblib.load()
f63c190 verified
"""
DGA-FANCI: Random Forest with 27 FANCI features for DGA detection.
Trained on 54 DGA families.
File size: ~1 GB (Random Forest with many estimators).
IMPORTANT: FANCIFeatureExtractor and FANCIRandomForest must be defined here
so that joblib can deserialize the saved model object.
"""
import re
import math
from collections import Counter
import numpy as np
import pandas as pd
import tldextract
class FANCIFeatureExtractor:
def __init__(self):
self.valid_tlds = {'com', 'org', 'net', 'edu', 'gov', 'mil', 'int', 'arpa',
'de', 'uk', 'fr', 'it', 'es', 'ru', 'cn', 'jp', 'br', 'au', 'ca'}
self.vowels = set('aeiouAEIOU')
self.consonants = set('bcdfghjklmnpqrstvwxyzBCDFGHJKLMNPQRSTVWXYZ')
def extract_e2ld(self, domain):
try:
extracted = tldextract.extract(domain)
if extracted.suffix in ['dyndns.org', 'ddns.net']:
return f"{extracted.subdomain}.{extracted.domain}" if extracted.subdomain else extracted.domain
return extracted.domain
except:
return domain
def get_dot_free_public_suffix_free(self, domain):
e2ld = self.extract_e2ld(domain)
return re.sub(r'[^a-zA-Z0-9]', '', e2ld)
def _is_prefix_repetition(self, domain):
parts = domain.split('.')
if len(parts) < 2:
return 0
base = parts[0]
for i in range(1, len(parts) - 1):
if not parts[i].startswith(base):
return 0
return 1
def _is_hex(self, s):
try:
int(s, 16)
return len(s) > 0
except:
return False
def _contains_ip(self, domain):
return bool(re.search(r'\b(?:\d{1,3}\.){3}\d{1,3}\b', domain))
def _consecutive_ratio(self, text, char_set):
if not text:
return 0
res, cur = [], 0
for c in text:
if c in char_set:
cur += 1
else:
if cur >= 2:
res.append(cur)
cur = 0
if cur >= 2:
res.append(cur)
return sum(res) / max(1, len(text))
def _ngram_features(self, text):
if not text:
return {k: 0 for k in ['ngram_mean', 'ngram_std', 'ngram_min', 'ngram_max',
'ngram_median', 'ngram_q25', 'ngram_q75']}
freqs = list(Counter(text).values())
return {
'ngram_mean': np.mean(freqs),
'ngram_std': np.std(freqs),
'ngram_min': np.min(freqs),
'ngram_max': np.max(freqs),
'ngram_median': np.median(freqs),
'ngram_q25': np.percentile(freqs, 25),
'ngram_q75': np.percentile(freqs, 75),
}
def _calculate_entropy(self, text):
if not text:
return 0
probs = [c / len(text) for c in Counter(text).values()]
return -sum(p * math.log2(p) for p in probs)
def extract_features(self, domain):
domain = domain.lower().strip()
e2ld = self.extract_e2ld(domain)
parts = domain.split('.')
ddsf = self.get_dot_free_public_suffix_free(domain)
f = {}
f['domain_length'] = len(domain)
f['num_subdomains'] = max(0, len(parts) - 2)
f['subdomain_length_mean'] = np.mean([len(p) for p in parts[:-1]]) if len(parts) > 1 else len(e2ld)
f['has_www_prefix'] = 1 if domain.startswith('www.') else 0
f['has_valid_tld'] = 1 if parts[-1] in self.valid_tlds else 0
f['contains_single_char_subdomain'] = 1 if any(len(p) == 1 for p in parts[:-1]) else 0
f['is_exclusive_prefix_repetition'] = self._is_prefix_repetition(domain)
f['contains_tld_as_subdomain'] = 1 if any(p in self.valid_tlds for p in parts[:-1]) else 0
total_sub = max(1, len(parts) - 1)
f['ratio_digit_exclusive_subdomains'] = sum(1 for p in parts[:-1] if p.isdigit()) / total_sub
f['ratio_hex_exclusive_subdomains'] = sum(1 for p in parts[:-1] if self._is_hex(p)) / total_sub
f['underscore_ratio'] = ddsf.count('_') / max(1, len(ddsf))
f['contains_ip_address'] = 1 if self._contains_ip(domain) else 0
f['contains_digits'] = 1 if any(c.isdigit() for c in ddsf) else 0
f['vowel_ratio'] = sum(1 for c in ddsf if c in self.vowels) / max(1, len(ddsf))
f['digit_ratio'] = sum(1 for c in ddsf if c.isdigit()) / max(1, len(ddsf))
f['alphabet_cardinality'] = len(set(ddsf))
counts = Counter(ddsf)
f['ratio_repeated_characters'] = sum(1 for c in counts.values() if c > 1) / max(1, len(set(ddsf)))
f['ratio_consecutive_consonants'] = self._consecutive_ratio(ddsf, self.consonants)
f['ratio_consecutive_digits'] = self._consecutive_ratio(ddsf, set('0123456789'))
f.update(self._ngram_features(ddsf))
f['entropy'] = self._calculate_entropy(ddsf)
return f
class FANCIRandomForest:
def __init__(self):
pass
def extract_features_from_dataframe(self, df):
feature_list = [self.feature_extractor.extract_features(d) for d in df['domain']]
feature_df = pd.DataFrame(feature_list)
return feature_df.reindex(columns=self.feature_names, fill_value=0)
def predict(self, domains):
temp_df = pd.DataFrame({'domain': domains})
X = self.extract_features_from_dataframe(temp_df)
preds = self.rf.predict(X)
probs = self.rf.predict_proba(X)
return [
{'domain': d, 'prediction': 'DGA' if preds[i] == 1 else 'Benign', 'dga_probability': probs[i][1]}
for i, d in enumerate(domains)
]
def load_model(model_path: str):
"""Load FANCI Random Forest from joblib file.
The model was saved from a Colab notebook (__main__ context), so joblib
looks for FANCIFeatureExtractor and FANCIRandomForest in __main__.
We inject both classes there before calling joblib.load().
"""
import sys
import joblib
main = sys.modules['__main__']
main.FANCIFeatureExtractor = FANCIFeatureExtractor
main.FANCIRandomForest = FANCIRandomForest
return joblib.load(model_path)
def predict(model, domains):
"""
Predict DGA vs legit for a list of domain strings.
model: FANCIRandomForest instance loaded from joblib.
Returns list of dicts: [{"domain": ..., "label": "dga"/"legit", "score": float}]
"""
if isinstance(domains, str):
domains = [domains]
results = model.predict(domains)
return [
{
"domain": r["domain"],
"label": "dga" if r["prediction"] == "DGA" else "legit",
"score": round(float(r["dga_probability"]), 4),
}
for r in results
]