File size: 6,756 Bytes
7189410
 
 
 
f63c190
 
 
7189410
 
f63c190
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
7189410
 
f63c190
 
 
 
 
 
 
7189410
f63c190
 
 
 
 
7189410
 
 
 
 
 
f63c190
7189410
 
 
 
 
f63c190
7189410
f63c190
 
 
 
 
 
7189410
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
"""
DGA-FANCI: Random Forest with 27 FANCI features for DGA detection.
Trained on 54 DGA families.
File size: ~1 GB (Random Forest with many estimators).

IMPORTANT: FANCIFeatureExtractor and FANCIRandomForest must be defined here
so that joblib can deserialize the saved model object.
"""

import re
import math
from collections import Counter
import numpy as np
import pandas as pd
import tldextract


class FANCIFeatureExtractor:
    def __init__(self):
        self.valid_tlds = {'com', 'org', 'net', 'edu', 'gov', 'mil', 'int', 'arpa',
                           'de', 'uk', 'fr', 'it', 'es', 'ru', 'cn', 'jp', 'br', 'au', 'ca'}
        self.vowels = set('aeiouAEIOU')
        self.consonants = set('bcdfghjklmnpqrstvwxyzBCDFGHJKLMNPQRSTVWXYZ')

    def extract_e2ld(self, domain):
        try:
            extracted = tldextract.extract(domain)
            if extracted.suffix in ['dyndns.org', 'ddns.net']:
                return f"{extracted.subdomain}.{extracted.domain}" if extracted.subdomain else extracted.domain
            return extracted.domain
        except:
            return domain

    def get_dot_free_public_suffix_free(self, domain):
        e2ld = self.extract_e2ld(domain)
        return re.sub(r'[^a-zA-Z0-9]', '', e2ld)

    def _is_prefix_repetition(self, domain):
        parts = domain.split('.')
        if len(parts) < 2:
            return 0
        base = parts[0]
        for i in range(1, len(parts) - 1):
            if not parts[i].startswith(base):
                return 0
        return 1

    def _is_hex(self, s):
        try:
            int(s, 16)
            return len(s) > 0
        except:
            return False

    def _contains_ip(self, domain):
        return bool(re.search(r'\b(?:\d{1,3}\.){3}\d{1,3}\b', domain))

    def _consecutive_ratio(self, text, char_set):
        if not text:
            return 0
        res, cur = [], 0
        for c in text:
            if c in char_set:
                cur += 1
            else:
                if cur >= 2:
                    res.append(cur)
                cur = 0
        if cur >= 2:
            res.append(cur)
        return sum(res) / max(1, len(text))

    def _ngram_features(self, text):
        if not text:
            return {k: 0 for k in ['ngram_mean', 'ngram_std', 'ngram_min', 'ngram_max',
                                    'ngram_median', 'ngram_q25', 'ngram_q75']}
        freqs = list(Counter(text).values())
        return {
            'ngram_mean': np.mean(freqs),
            'ngram_std': np.std(freqs),
            'ngram_min': np.min(freqs),
            'ngram_max': np.max(freqs),
            'ngram_median': np.median(freqs),
            'ngram_q25': np.percentile(freqs, 25),
            'ngram_q75': np.percentile(freqs, 75),
        }

    def _calculate_entropy(self, text):
        if not text:
            return 0
        probs = [c / len(text) for c in Counter(text).values()]
        return -sum(p * math.log2(p) for p in probs)

    def extract_features(self, domain):
        domain = domain.lower().strip()
        e2ld = self.extract_e2ld(domain)
        parts = domain.split('.')
        ddsf = self.get_dot_free_public_suffix_free(domain)
        f = {}
        f['domain_length'] = len(domain)
        f['num_subdomains'] = max(0, len(parts) - 2)
        f['subdomain_length_mean'] = np.mean([len(p) for p in parts[:-1]]) if len(parts) > 1 else len(e2ld)
        f['has_www_prefix'] = 1 if domain.startswith('www.') else 0
        f['has_valid_tld'] = 1 if parts[-1] in self.valid_tlds else 0
        f['contains_single_char_subdomain'] = 1 if any(len(p) == 1 for p in parts[:-1]) else 0
        f['is_exclusive_prefix_repetition'] = self._is_prefix_repetition(domain)
        f['contains_tld_as_subdomain'] = 1 if any(p in self.valid_tlds for p in parts[:-1]) else 0
        total_sub = max(1, len(parts) - 1)
        f['ratio_digit_exclusive_subdomains'] = sum(1 for p in parts[:-1] if p.isdigit()) / total_sub
        f['ratio_hex_exclusive_subdomains'] = sum(1 for p in parts[:-1] if self._is_hex(p)) / total_sub
        f['underscore_ratio'] = ddsf.count('_') / max(1, len(ddsf))
        f['contains_ip_address'] = 1 if self._contains_ip(domain) else 0
        f['contains_digits'] = 1 if any(c.isdigit() for c in ddsf) else 0
        f['vowel_ratio'] = sum(1 for c in ddsf if c in self.vowels) / max(1, len(ddsf))
        f['digit_ratio'] = sum(1 for c in ddsf if c.isdigit()) / max(1, len(ddsf))
        f['alphabet_cardinality'] = len(set(ddsf))
        counts = Counter(ddsf)
        f['ratio_repeated_characters'] = sum(1 for c in counts.values() if c > 1) / max(1, len(set(ddsf)))
        f['ratio_consecutive_consonants'] = self._consecutive_ratio(ddsf, self.consonants)
        f['ratio_consecutive_digits'] = self._consecutive_ratio(ddsf, set('0123456789'))
        f.update(self._ngram_features(ddsf))
        f['entropy'] = self._calculate_entropy(ddsf)
        return f


class FANCIRandomForest:
    def __init__(self):
        pass

    def extract_features_from_dataframe(self, df):
        feature_list = [self.feature_extractor.extract_features(d) for d in df['domain']]
        feature_df = pd.DataFrame(feature_list)
        return feature_df.reindex(columns=self.feature_names, fill_value=0)

    def predict(self, domains):
        temp_df = pd.DataFrame({'domain': domains})
        X = self.extract_features_from_dataframe(temp_df)
        preds = self.rf.predict(X)
        probs = self.rf.predict_proba(X)
        return [
            {'domain': d, 'prediction': 'DGA' if preds[i] == 1 else 'Benign', 'dga_probability': probs[i][1]}
            for i, d in enumerate(domains)
        ]


def load_model(model_path: str):
    """Load FANCI Random Forest from joblib file.

    The model was saved from a Colab notebook (__main__ context), so joblib
    looks for FANCIFeatureExtractor and FANCIRandomForest in __main__.
    We inject both classes there before calling joblib.load().
    """
    import sys
    import joblib

    main = sys.modules['__main__']
    main.FANCIFeatureExtractor = FANCIFeatureExtractor
    main.FANCIRandomForest = FANCIRandomForest

    return joblib.load(model_path)


def predict(model, domains):
    """
    Predict DGA vs legit for a list of domain strings.
    model: FANCIRandomForest instance loaded from joblib.
    Returns list of dicts: [{"domain": ..., "label": "dga"/"legit", "score": float}]
    """
    if isinstance(domains, str):
        domains = [domains]

    results = model.predict(domains)
    return [
        {
            "domain": r["domain"],
            "label": "dga" if r["prediction"] == "DGA" else "legit",
            "score": round(float(r["dga_probability"]), 4),
        }
        for r in results
    ]