import re import joblib import pandas as pd import numpy as np from typing import Dict, Any _SUSPICIOUS_TOKENS = [ "login", "verify", "secure", "update", "bank", "pay", "account", "webscr" ] _IPV4_PATTERN = re.compile(r"(?:\d{1,3}\.){3}\d{1,3}") def _engineer_features(url_series: pd.Series) -> pd.DataFrame: s = url_series.astype(str) out = pd.DataFrame(index=s.index) out["url_len"] = s.str.len().fillna(0) out["count_dot"] = s.str.count(r"\.") out["count_hyphen"] = s.str.count("-") out["count_digit"] = s.str.count(r"\d") out["count_at"] = s.str.count("@") out["count_qmark"] = s.str.count("\?") out["count_eq"] = s.str.count("=") out["count_slash"] = s.str.count("/") out["digit_ratio"] = (out["count_digit"] / out["url_len"].replace(0, np.nan)).fillna(0) out["has_ip"] = s.str.contains(_IPV4_PATTERN).astype(int) for tok in _SUSPICIOUS_TOKENS: out[f"has_{tok}"] = s.str.contains(tok, case=False, regex=False).astype(int) out["starts_https"] = s.str.startswith("https").astype(int) out["ends_with_exe"] = s.str.endswith(".exe").astype(int) out["ends_with_zip"] = s.str.endswith(".zip").astype(int) return out def load_bundle(path: str) -> Dict[str, Any]: """Load the saved joblib bundle produced by the notebook. Returns a dict with keys: model, feature_cols, url_col, label_col, model_type """ bundle = joblib.load(path) required = {"model", "feature_cols", "url_col", "label_col", "model_type"} missing = required - set(bundle.keys()) if missing: raise ValueError(f"Bundle missing keys: {missing}") return bundle def predict_url(url: str, bundle: Dict[str, Any], threshold: float = 0.5) -> Dict[str, Any]: """Predict phishing probability for a single URL using the saved bundle.""" url_col = bundle["url_col"] feature_cols = bundle["feature_cols"] model_type = bundle.get("model_type", "xgboost_bst") model = bundle["model"] row = pd.DataFrame({url_col: [url]}) feats = _engineer_features(row[url_col])[feature_cols] if model_type == "xgboost_bst": import xgboost as xgb # local import to keep base env minimal dmat = xgb.DMatrix(feats) proba = float(model.predict(dmat)[0]) elif model_type == "cuml_rf": try: import cudf # type: ignore gfeats = cudf.DataFrame.from_pandas(feats) proba = float(model.predict_proba(gfeats)[:, 1].to_pandas().values[0]) except Exception as e: # pragma: no cover raise RuntimeError("cudf/cuml required for this bundle but not available") from e else: proba = float(model.predict_proba(feats)[:, 1][0]) pred = int(proba >= threshold) return { "url": url, "phishing_probability": proba, "predicted_label": pred, "backend": model_type, } if __name__ == "__main__": # Simple manual test (optional) try: bundle = load_bundle("rf_url_phishing_xgboost_bst.joblib") print( predict_url( "http://secure-login-account-update.example.com/session?id=123", bundle=bundle, ) ) except FileNotFoundError: print("Bundle not found in current directory. This is expected inside the source repo.")