| import re
|
| import joblib
|
| import pandas as pd
|
| import numpy as np
|
| from urllib.parse import urlparse
|
| from typing import Dict, Any
|
|
|
| _SUSPICIOUS_TOKENS = [
|
| "login", "verify", "secure", "update", "bank", "pay", "account", "webscr"
|
| ]
|
| _IPV4_PATTERN = re.compile(r"(?:\d{1,3}\.){3}\d{1,3}")
|
|
|
| _BRAND_NAMES = [
|
| "facebook","paypal","google","amazon","apple","microsoft",
|
| "instagram","netflix","bank","hsbc","linkedin","yahoo","outlook"
|
| ]
|
| _SUSPICIOUS_TLDS = {"zip","xyz","top","ru","kim","support","ltd","work","gq","tk","ml"}
|
|
|
| try:
|
| from rapidfuzz import fuzz
|
| def _sim(a: str, b: str) -> float:
|
| return fuzz.ratio(a, b) / 100.0
|
| except Exception:
|
| import difflib
|
| def _sim(a: str, b: str) -> float:
|
| return difflib.SequenceMatcher(None, a, b).ratio()
|
|
|
|
|
| def _ensure_scheme(u: str) -> str:
|
| return u if re.match(r'^[a-zA-Z][a-zA-Z0-9+.\-]*://', u) else 'http://' + u
|
|
|
|
|
| def _get_hostname(u: str) -> str:
|
| try:
|
| host = urlparse(_ensure_scheme(u)).hostname or ''
|
| try:
|
| host = host.encode('ascii').decode('idna')
|
| except Exception:
|
| pass
|
| return host.lower()
|
| except Exception:
|
| return ''
|
|
|
|
|
| def _get_sld(host: str) -> str:
|
| parts = host.split('.')
|
| if len(parts) >= 2:
|
| return parts[-2]
|
| return host
|
|
|
|
|
| def _get_tld(host: str) -> str:
|
| parts = host.split('.')
|
| return parts[-1] if len(parts) >= 2 else ''
|
|
|
|
|
| def _shannon_entropy(s: str) -> float:
|
| if not s:
|
| return 0.0
|
| counts = {}
|
| for ch in s:
|
| counts[ch] = counts.get(ch, 0) + 1
|
| probs = np.array(list(counts.values()), dtype=float)
|
| probs /= probs.sum()
|
| return float(-(probs * np.log2(probs)).sum())
|
|
|
|
|
| def _clean_for_brand(s: str) -> str:
|
| return re.sub(r'[^a-z]', '', re.sub(r'\d+', '', s.lower()))
|
|
|
|
|
| def _engineer_features(url_series: pd.Series) -> pd.DataFrame:
|
| s = url_series.astype(str)
|
| out = pd.DataFrame(index=s.index)
|
|
|
|
|
| out["url_len"] = s.str.len().fillna(0)
|
| out["count_dot"] = s.str.count(r"\.")
|
| out["count_hyphen"] = s.str.count("-")
|
| out["count_digit"] = s.str.count(r"\d")
|
| out["count_at"] = s.str.count("@")
|
| out["count_qmark"] = s.str.count("\?")
|
| out["count_eq"] = s.str.count("=")
|
| out["count_slash"] = s.str.count("/")
|
| out["digit_ratio"] = (out["count_digit"] / out["url_len"].replace(0, np.nan)).fillna(0)
|
| out["has_ip"] = s.str.contains(_IPV4_PATTERN).astype(int)
|
| for tok in _SUSPICIOUS_TOKENS:
|
| out[f"has_{tok}"] = s.str.contains(tok, case=False, regex=False).astype(int)
|
| out["starts_https"] = s.str.startswith("https").astype(int)
|
| out["ends_with_exe"] = s.str.endswith(".exe").astype(int)
|
| out["ends_with_zip"] = s.str.endswith(".zip").astype(int)
|
|
|
|
|
| host = s.apply(_get_hostname)
|
| sld = host.apply(_get_sld)
|
| tld = host.apply(_get_tld)
|
|
|
| out['host_len'] = host.str.len().fillna(0)
|
| sub_count = host.str.count(r'\.') - 1
|
| out['subdomain_count'] = sub_count.fillna(0).clip(lower=0).astype(int)
|
| out['tld_suspicious'] = tld.isin(list(_SUSPICIOUS_TLDS)).astype(int)
|
| out['has_punycode'] = host.str.contains('xn--', na=False).astype(int)
|
|
|
| out['sld_len'] = sld.str.len().fillna(0)
|
| sld_digit_count = sld.str.count(r'\d')
|
| out['sld_digit_ratio'] = (sld_digit_count / out['sld_len'].replace(0, np.nan)).fillna(0)
|
| out['sld_entropy'] = sld.apply(_shannon_entropy).astype(float)
|
|
|
|
|
| sld_clean = sld.apply(_clean_for_brand)
|
|
|
| def _max_brand_sim(name: str) -> float:
|
| if not isinstance(name, str) or not name:
|
| return 0.0
|
| best = 0.0
|
| for b in _BRAND_NAMES:
|
| sc = _sim(name, b)
|
| if sc > best:
|
| best = sc
|
| return float(best)
|
|
|
| out['max_brand_sim'] = sld_clean.apply(_max_brand_sim).astype(float)
|
| out['like_facebook'] = sld_clean.apply(lambda x: 1 if _sim(x, 'facebook') >= 0.82 else 0).astype(int)
|
|
|
| OFFICIAL_DOMAINS = {
|
| 'facebook': ['facebook.com'],
|
| 'paypal': ['paypal.com'],
|
| 'google': ['google.com'],
|
| 'amazon': ['amazon.com'],
|
| 'apple': ['apple.com'],
|
| 'microsoft': ['microsoft.com'],
|
| 'instagram': ['instagram.com'],
|
| 'netflix': ['netflix.com'],
|
| 'hsbc': ['hsbc.com'],
|
| 'linkedin': ['linkedin.com'],
|
| 'yahoo': ['yahoo.com'],
|
| 'outlook': ['outlook.com']
|
| }
|
|
|
| def _normalize_leet(name: str) -> str:
|
| if not isinstance(name, str):
|
| return ''
|
| table = str.maketrans({'0':'o','1':'l','3':'e','4':'a','5':'s','7':'t','2':'z','8':'b'})
|
| return name.translate(table)
|
|
|
| def _best_brand(name: str):
|
| if not isinstance(name, str) or not name:
|
| return '', 0.0
|
| best_b, best_s = '', 0.0
|
| for b in _BRAND_NAMES:
|
| sc = _sim(name, b)
|
| if sc > best_s:
|
| best_b, best_s = b, sc
|
| return best_b, float(best_s)
|
|
|
| def _get_etld1(h: str) -> str:
|
| parts = h.split('.') if isinstance(h, str) else []
|
| if len(parts) >= 2:
|
| return parts[-2] + '.' + parts[-1]
|
| return h
|
|
|
| etld1 = host.apply(_get_etld1)
|
| brand_best_and_sim = sld_clean.apply(_best_brand)
|
| brand_best = brand_best_and_sim.apply(lambda x: x[0])
|
| brand_best_sim = brand_best_and_sim.apply(lambda x: x[1])
|
|
|
| out['is_official_brand_domain'] = [
|
| 1 if bb and et in OFFICIAL_DOMAINS.get(bb, []) else 0
|
| for bb, et in zip(brand_best, etld1)
|
| ]
|
|
|
| out['brand_digit_insertion'] = ((sld_clean == brand_best) & (sld.str.contains(r'\d'))).astype(int)
|
|
|
| sld_leet_norm = sld.apply(_normalize_leet).apply(_clean_for_brand)
|
| def _max_brand_sim_leet(name: str) -> float:
|
| if not isinstance(name, str) or not name:
|
| return 0.0
|
| best = 0.0
|
| for b in _BRAND_NAMES:
|
| sc = _sim(name, b)
|
| if sc > best:
|
| best = sc
|
| return float(best)
|
| out['max_brand_sim_leet'] = sld_leet_norm.apply(_max_brand_sim_leet).astype(float)
|
| out['like_brand_leet'] = (out['max_brand_sim_leet'] >= 0.88).astype(int)
|
|
|
| def _contains_brand_extra(name: str) -> int:
|
| if not isinstance(name, str) or not name:
|
| return 0
|
| for b in _BRAND_NAMES:
|
| if name != b and b in name:
|
| return 1
|
| return 0
|
| out['sld_contains_brand_extra'] = sld_clean.apply(_contains_brand_extra).astype(int)
|
|
|
| out['brand_impersonation'] = (
|
| ((brand_best_sim >= 0.88) | (out['like_brand_leet'] == 1) | (out['sld_contains_brand_extra'] == 1))
|
| & (out['is_official_brand_domain'] == 0)
|
| ).astype(int)
|
|
|
| out['sld_has_hyphen'] = sld.str.contains('-', na=False).astype(int)
|
| out['sld_has_digits'] = (sld.str.count(r'\d') > 0).astype(int)
|
|
|
| return out
|
|
|
|
|
| def load_bundle(path: str) -> Dict[str, Any]:
|
| """Load the saved joblib bundle produced by the notebook.
|
|
|
| Returns a dict with keys: model, feature_cols, url_col, label_col, model_type
|
| """
|
| bundle = joblib.load(path)
|
| required = {"model", "feature_cols", "url_col", "label_col", "model_type"}
|
| missing = required - set(bundle.keys())
|
| if missing:
|
| raise ValueError(f"Bundle missing keys: {missing}")
|
| return bundle
|
|
|
|
|
| def predict_url(url: str, bundle: Dict[str, Any], threshold: float = 0.5) -> Dict[str, Any]:
|
| """Predict phishing probability for a single URL using the saved bundle.
|
|
|
| Applies a rule-based typosquatting guard to catch cases like face123book.com
|
| even if the model probability is low.
|
| """
|
| url_col = bundle["url_col"]
|
| feature_cols = bundle["feature_cols"]
|
| trained_feature_cols = bundle.get("trained_feature_cols")
|
| model_type = bundle.get("model_type", "xgboost_bst")
|
| model = bundle["model"]
|
|
|
| row = pd.DataFrame({url_col: [url]})
|
| feats_full = _engineer_features(row[url_col])
|
| desired_cols = list(trained_feature_cols) if trained_feature_cols is not None else list(feature_cols)
|
| feats = feats_full.reindex(columns=desired_cols, fill_value=0)
|
|
|
| if model_type == "xgboost_bst":
|
| import xgboost as xgb
|
| dmat = xgb.DMatrix(feats)
|
| proba = float(model.predict(dmat)[0])
|
| elif model_type == "cuml_rf":
|
| try:
|
| import cudf
|
| gfeats = cudf.DataFrame.from_pandas(feats)
|
| proba = float(model.predict_proba(gfeats)[:, 1].to_pandas().values[0])
|
| except Exception as e:
|
| raise RuntimeError("cudf/cuml required for this bundle but not available") from e
|
| else:
|
| proba = float(model.predict_proba(feats)[:, 1][0])
|
|
|
|
|
| def _bool(feature: str, default: int = 0) -> int:
|
| return int(feature in feats_full.columns and bool(feats_full.iloc[0].get(feature, default)))
|
|
|
| def _float(feature: str, default: float = 0.0) -> float:
|
| return float(feats_full.iloc[0].get(feature, default)) if feature in feats_full.columns else default
|
|
|
| like_brand = (
|
| _bool('brand_impersonation') == 1 or
|
| _bool('like_brand_leet') == 1 or
|
| _float('max_brand_sim_leet') >= 0.90 or
|
| _float('max_brand_sim') >= 0.90 or
|
| _bool('sld_contains_brand_extra') == 1
|
| )
|
| risky_host = (
|
| _bool('is_official_brand_domain') == 0 and
|
| (
|
| _bool('sld_has_digits') == 1 or
|
| _bool('sld_has_hyphen') == 1 or
|
| _bool('tld_suspicious') == 1 or
|
| _bool('has_punycode') == 1
|
| )
|
| )
|
| rule_triggered = bool(like_brand and risky_host)
|
|
|
| pred = int(proba >= threshold)
|
| if rule_triggered and pred == 0:
|
| pred = 1
|
| proba = max(proba, 0.9)
|
|
|
| result = {
|
| "url": url,
|
| "phishing_probability": proba,
|
| "predicted_label": pred,
|
| "backend": model_type,
|
| }
|
| if rule_triggered:
|
| result["rule"] = "typosquat_guard"
|
| return result
|
|
|
|
|
| if __name__ == "__main__":
|
|
|
| try:
|
| bundle = load_bundle("models/rf_url_phishing_xgboost_bst.joblib")
|
| print(
|
| predict_url(
|
| "www.face123book.com",
|
| bundle=bundle,
|
| )
|
| )
|
| except FileNotFoundError:
|
| print("Bundle not found in current directory. This is expected inside the source repo.")
|
|
|
|
|
|
|