""" feature_engineering.py ====================== Feature pipeline for the CYB004 baseline classifier. Predicts `campaign_phase` (7-class) from per-timestep phishing campaign trajectory data on the CYB004 sample dataset. CSV inputs: campaign_trajectories.csv (primary, one row per timestep, 100 campaigns x ~40 timesteps = 3,952 rows) victim_topology.csv (per-department victim configuration, joined on target_department_id) campaign_summary.csv (per-campaign aggregates; reserved for future work) campaign_events.csv (discrete event log; reserved for future work) Target classes (7 phases observed in the sample): target_reconnaissance, infrastructure_setup, lure_crafting, email_delivery, victim_engagement, credential_harvesting, post_compromise_escalation This is the email-security / SOC use case: given the observable campaign telemetry at a moment in time, what phase of the phishing lifecycle is the campaign in? The pivot to campaign_phase (away from actor_capability_tier, the README's headline use case) happened because per-campaign-constant features (lure_personalisation_score, click_through_rate, credential_submission_rate, target_department_id) leak tier via the small test fold under group-aware splitting. With those features removed, honest tier prediction is below majority baseline. The full 335k-row CYB004 dataset would address this; the sample does not. See the model card for full discussion. Public API ---------- build_features(trajectories_path, topology_path) -> (X, y, groups, meta) transform_single(record, meta, victim_aggregates=None) -> np.ndarray save_meta(meta, path) / load_meta(path) build_department_lookup(topology_path) -> dict License ------- Ships with the public model on Hugging Face under CC-BY-NC-4.0, matching the dataset license. See README.md. """ from __future__ import annotations import json from pathlib import Path from typing import Any import numpy as np import pandas as pd # --------------------------------------------------------------------------- # Label space # --------------------------------------------------------------------------- LABEL_ORDER = [ "target_reconnaissance", "infrastructure_setup", "lure_crafting", "email_delivery", "victim_engagement", "credential_harvesting", "post_compromise_escalation", ] LABEL_TO_INT = {lbl: i for i, lbl in enumerate(LABEL_ORDER)} INT_TO_LABEL = {i: lbl for lbl, i in LABEL_TO_INT.items()} # --------------------------------------------------------------------------- # Identifier and target columns - not features # --------------------------------------------------------------------------- ID_COLUMNS = ["campaign_id", "actor_id"] TARGET_COLUMN = "campaign_phase" # `actor_capability_tier` is kept as a feature - it's a real SOC observable # (analysts typically have an actor cluster hypothesis), and its # purity-vs-phase is 0.18 (uniform baseline 0.14), so it isn't an oracle. # `delivery_outcome` is dropped: its purity vs phase is much higher # (0.36) - `no_delivery` appears only in early phases, effectively # encoding phase position. Keeping it would give the model a near-oracle. LEAKY_COLUMNS = [ "delivery_outcome", ] # --------------------------------------------------------------------------- # Per-timestep numeric features # --------------------------------------------------------------------------- DIRECT_NUMERIC_TIMESTEP_FEATURES = [ "timestep", # strong but non-deterministic phase signal "emails_sent_cumulative", # increases through campaign; useful position proxy "click_through_rate", # per-campaign constant; informative when combined with timestep "credential_submission_rate", # per-campaign constant "gateway_detection_score", # per-step variation "lure_personalisation_score", # per-campaign constant; tier signal "target_department_id", # per-campaign constant; treated as ordinal ID ] # Per-timestep categoricals CATEGORICAL_TIMESTEP_FEATURES = [ "evasion_technique_active", # 6 levels incl. "none" (82%); active evasion correlates with mid-late phases "actor_capability_tier", # 4 levels; mostly per-campaign constant ] # --------------------------------------------------------------------------- # Victim topology features (joined on target_department_id) # --------------------------------------------------------------------------- TOPOLOGY_NUMERIC_FEATURES = [ "employee_count", "privileged_account_density", "mfa_enrollment_rate", "click_susceptibility_base", "email_volume_daily", ] TOPOLOGY_CATEGORICAL_FEATURES = [ "department_type", "industry_sector", "awareness_training_level", "gateway_architecture", "dmarc_enforcement_level", ] # --------------------------------------------------------------------------- # Engineered features (none derived from phase or timestep alone) # --------------------------------------------------------------------------- def _add_engineered_features(df: pd.DataFrame) -> pd.DataFrame: """ Six engineered features. None directly encode phase; each is a behavioural composite that helps disambiguate adjacent phases. """ df = df.copy() # 1. Log-scaled email volume. emails_sent_cumulative is heavy-tailed # (0 in recon, hundreds-to-thousands by post_compromise). df["log_emails_sent"] = np.log1p(df["emails_sent_cumulative"].clip(lower=0)).astype(float) # 2. Gateway-blocked step. gateway_detection_score > 0.7 marks # high-confidence gateway intervention; common in email_delivery. df["is_gateway_blocked_step"] = (df["gateway_detection_score"] > 0.7).astype(int) # 3. Evasion-active flag. Non-"none" evasion_technique_active # concentrates in lure_crafting and email_delivery. df["is_evasion_active"] = (df["evasion_technique_active"] != "none").astype(int) # 4. High-personalisation flag. lure_personalisation_score > 0.7 is # an APT-tier signature. df["is_high_personalisation"] = (df["lure_personalisation_score"] > 0.7).astype(int) # 5. Has credential capture flag. credential_submission_rate > 0 # indicates the campaign has reached credential-capture phases. df["has_credential_capture"] = (df["credential_submission_rate"] > 0).astype(int) # 6. Engaged-victim flag. click_through_rate > 0 indicates # victim_engagement or later phase. df["has_user_engagement"] = (df["click_through_rate"] > 0).astype(int) return df # --------------------------------------------------------------------------- # Public API # --------------------------------------------------------------------------- def build_features( trajectories_path: str | Path, topology_path: str | Path, ) -> tuple[pd.DataFrame, pd.Series, pd.Series, dict[str, Any]]: """ Load CSVs, join topology, drop target + leaky columns, engineer features, one-hot encode, return (X, y, groups, meta). `groups` is a Series of campaign_id values aligned with X. Use it with GroupShuffleSplit / GroupKFold: a single campaign generates ~40 correlated timesteps; row-level random splitting inflates metrics. """ traj = pd.read_csv(trajectories_path) topo = pd.read_csv(topology_path) y = traj[TARGET_COLUMN].map(LABEL_TO_INT) if y.isna().any(): bad = traj.loc[y.isna(), TARGET_COLUMN].unique() raise ValueError(f"Unknown campaign_phase values: {bad}") y = y.astype(int) groups = traj["campaign_id"].copy() traj = traj.drop(columns=ID_COLUMNS + [TARGET_COLUMN] + LEAKY_COLUMNS, errors="ignore") topo_cols_needed = ( ["department_id"] + TOPOLOGY_NUMERIC_FEATURES + TOPOLOGY_CATEGORICAL_FEATURES ) traj = traj.merge( topo[topo_cols_needed], left_on="target_department_id", right_on="department_id", how="left", ).drop(columns=["department_id"], errors="ignore") traj = _add_engineered_features(traj) numeric_features = ( DIRECT_NUMERIC_TIMESTEP_FEATURES + TOPOLOGY_NUMERIC_FEATURES + [ "log_emails_sent", "is_gateway_blocked_step", "is_evasion_active", "is_high_personalisation", "has_credential_capture", "has_user_engagement", ] ) X_numeric = traj[numeric_features].astype(float) all_categorical = ( [(col, "timestep") for col in CATEGORICAL_TIMESTEP_FEATURES] + [(col, "topology") for col in TOPOLOGY_CATEGORICAL_FEATURES] ) categorical_levels: dict[str, list[str]] = {} blocks: list[pd.DataFrame] = [] for col, _src in all_categorical: if col not in traj.columns: continue levels = sorted(traj[col].dropna().unique().tolist()) categorical_levels[col] = levels block = pd.get_dummies( traj[col].astype("category").cat.set_categories(levels), prefix=col, dummy_na=False, ).astype(int) blocks.append(block) X = pd.concat( [X_numeric.reset_index(drop=True)] + [b.reset_index(drop=True) for b in blocks], axis=1, ).fillna(0.0) meta = { "feature_names": X.columns.tolist(), "numeric_features": numeric_features, "categorical_levels": categorical_levels, "label_to_int": LABEL_TO_INT, "int_to_label": INT_TO_LABEL, "leakage_excluded": LEAKY_COLUMNS, } return X, y, groups, meta def transform_single( record: dict | pd.DataFrame, meta: dict[str, Any], victim_aggregates: dict | None = None, ) -> np.ndarray: """Encode a single timestep record for inference.""" if isinstance(record, dict): df = pd.DataFrame([record.copy()]) else: df = record.copy() if victim_aggregates is not None: for k, v in victim_aggregates.items(): df[k] = v df = _add_engineered_features(df) numeric = pd.DataFrame({ col: df.get(col, pd.Series([0.0] * len(df))).astype(float).values for col in meta["numeric_features"] }) blocks: list[pd.DataFrame] = [numeric] for col, levels in meta["categorical_levels"].items(): val = df.get(col, pd.Series([None] * len(df))) block = pd.get_dummies( val.astype("category").cat.set_categories(levels), prefix=col, dummy_na=False, ).astype(int) for lvl in levels: cname = f"{col}_{lvl}" if cname not in block.columns: block[cname] = 0 block = block[[f"{col}_{lvl}" for lvl in levels]] blocks.append(block) X = pd.concat(blocks, axis=1).fillna(0.0) X = X.reindex(columns=meta["feature_names"], fill_value=0.0) return X.values.astype(np.float32) def save_meta(meta: dict[str, Any], path: str | Path) -> None: serializable = { "feature_names": meta["feature_names"], "numeric_features": meta["numeric_features"], "categorical_levels": meta["categorical_levels"], "label_to_int": meta["label_to_int"], "int_to_label": {str(k): v for k, v in meta["int_to_label"].items()}, "leakage_excluded": meta.get("leakage_excluded", []), } with open(path, "w") as f: json.dump(serializable, f, indent=2) def load_meta(path: str | Path) -> dict[str, Any]: with open(path) as f: meta = json.load(f) meta["int_to_label"] = {int(k): v for k, v in meta["int_to_label"].items()} return meta def build_department_lookup(topology_path: str | Path) -> dict[int, dict]: """Build {department_id: {topology features}} for inference-time lookup.""" topo = pd.read_csv(topology_path) cols = TOPOLOGY_NUMERIC_FEATURES + TOPOLOGY_CATEGORICAL_FEATURES out = {} for _, row in topo.iterrows(): out[int(row["department_id"])] = {c: row[c] for c in cols if c in topo.columns} return out if __name__ == "__main__": import sys base = Path(sys.argv[1]) if len(sys.argv) > 1 else Path("/mnt/user-data/uploads") X, y, groups, meta = build_features( base / "campaign_trajectories.csv", base / "victim_topology.csv", ) print(f"X shape: {X.shape}") print(f"y shape: {y.shape}") print(f"groups: {groups.nunique()} campaigns") print(f"n features: {len(meta['feature_names'])}") print(f"label distribution:\n{y.map(INT_TO_LABEL).value_counts()}") print(f"X has NaN: {X.isnull().any().any()}")