| """ |
| feature_engineering.py |
| ====================== |
| |
| Feature pipeline for the CYB004 baseline classifier. |
| |
| Predicts `campaign_phase` (7-class) from per-timestep phishing campaign |
| trajectory data on the CYB004 sample dataset. |
| |
| CSV inputs: |
| campaign_trajectories.csv (primary, one row per timestep, 100 |
| campaigns x ~40 timesteps = 3,952 rows) |
| victim_topology.csv (per-department victim configuration, |
| joined on target_department_id) |
| campaign_summary.csv (per-campaign aggregates; reserved for |
| future work) |
| campaign_events.csv (discrete event log; reserved for |
| future work) |
| |
| Target classes (7 phases observed in the sample): |
| target_reconnaissance, infrastructure_setup, lure_crafting, |
| email_delivery, victim_engagement, credential_harvesting, |
| post_compromise_escalation |
| |
| This is the email-security / SOC use case: given the observable |
| campaign telemetry at a moment in time, what phase of the phishing |
| lifecycle is the campaign in? |
| |
| The pivot to campaign_phase (away from actor_capability_tier, the |
| README's headline use case) happened because per-campaign-constant |
| features (lure_personalisation_score, click_through_rate, |
| credential_submission_rate, target_department_id) leak tier via the |
| small test fold under group-aware splitting. With those features |
| removed, honest tier prediction is below majority baseline. The full |
| 335k-row CYB004 dataset would address this; the sample does not. |
| See the model card for full discussion. |
| |
| Public API |
| ---------- |
| build_features(trajectories_path, topology_path) |
| -> (X, y, groups, meta) |
| transform_single(record, meta, victim_aggregates=None) -> np.ndarray |
| save_meta(meta, path) / load_meta(path) |
| build_department_lookup(topology_path) -> dict |
| |
| License |
| ------- |
| Ships with the public model on Hugging Face under CC-BY-NC-4.0, matching |
| the dataset license. See README.md. |
| """ |
|
|
| from __future__ import annotations |
|
|
| import json |
| from pathlib import Path |
| from typing import Any |
|
|
| import numpy as np |
| import pandas as pd |
|
|
| |
| |
| |
|
|
| LABEL_ORDER = [ |
| "target_reconnaissance", |
| "infrastructure_setup", |
| "lure_crafting", |
| "email_delivery", |
| "victim_engagement", |
| "credential_harvesting", |
| "post_compromise_escalation", |
| ] |
| LABEL_TO_INT = {lbl: i for i, lbl in enumerate(LABEL_ORDER)} |
| INT_TO_LABEL = {i: lbl for lbl, i in LABEL_TO_INT.items()} |
|
|
| |
| |
| |
|
|
| ID_COLUMNS = ["campaign_id", "actor_id"] |
| TARGET_COLUMN = "campaign_phase" |
|
|
| |
| |
| |
|
|
| |
| |
| |
| LEAKY_COLUMNS = [ |
| "delivery_outcome", |
| ] |
|
|
| |
| |
| |
|
|
| DIRECT_NUMERIC_TIMESTEP_FEATURES = [ |
| "timestep", |
| "emails_sent_cumulative", |
| "click_through_rate", |
| "credential_submission_rate", |
| "gateway_detection_score", |
| "lure_personalisation_score", |
| "target_department_id", |
| ] |
|
|
| |
| CATEGORICAL_TIMESTEP_FEATURES = [ |
| "evasion_technique_active", |
| "actor_capability_tier", |
| ] |
|
|
| |
| |
| |
|
|
| TOPOLOGY_NUMERIC_FEATURES = [ |
| "employee_count", |
| "privileged_account_density", |
| "mfa_enrollment_rate", |
| "click_susceptibility_base", |
| "email_volume_daily", |
| ] |
|
|
| TOPOLOGY_CATEGORICAL_FEATURES = [ |
| "department_type", |
| "industry_sector", |
| "awareness_training_level", |
| "gateway_architecture", |
| "dmarc_enforcement_level", |
| ] |
|
|
|
|
| |
| |
| |
|
|
| def _add_engineered_features(df: pd.DataFrame) -> pd.DataFrame: |
| """ |
| Six engineered features. None directly encode phase; each is a |
| behavioural composite that helps disambiguate adjacent phases. |
| """ |
| df = df.copy() |
|
|
| |
| |
| df["log_emails_sent"] = np.log1p(df["emails_sent_cumulative"].clip(lower=0)).astype(float) |
|
|
| |
| |
| df["is_gateway_blocked_step"] = (df["gateway_detection_score"] > 0.7).astype(int) |
|
|
| |
| |
| df["is_evasion_active"] = (df["evasion_technique_active"] != "none").astype(int) |
|
|
| |
| |
| df["is_high_personalisation"] = (df["lure_personalisation_score"] > 0.7).astype(int) |
|
|
| |
| |
| df["has_credential_capture"] = (df["credential_submission_rate"] > 0).astype(int) |
|
|
| |
| |
| df["has_user_engagement"] = (df["click_through_rate"] > 0).astype(int) |
|
|
| return df |
|
|
|
|
| |
| |
| |
|
|
| def build_features( |
| trajectories_path: str | Path, |
| topology_path: str | Path, |
| ) -> tuple[pd.DataFrame, pd.Series, pd.Series, dict[str, Any]]: |
| """ |
| Load CSVs, join topology, drop target + leaky columns, engineer features, |
| one-hot encode, return (X, y, groups, meta). |
| |
| `groups` is a Series of campaign_id values aligned with X. Use it with |
| GroupShuffleSplit / GroupKFold: a single campaign generates ~40 |
| correlated timesteps; row-level random splitting inflates metrics. |
| """ |
| traj = pd.read_csv(trajectories_path) |
| topo = pd.read_csv(topology_path) |
|
|
| y = traj[TARGET_COLUMN].map(LABEL_TO_INT) |
| if y.isna().any(): |
| bad = traj.loc[y.isna(), TARGET_COLUMN].unique() |
| raise ValueError(f"Unknown campaign_phase values: {bad}") |
| y = y.astype(int) |
| groups = traj["campaign_id"].copy() |
|
|
| traj = traj.drop(columns=ID_COLUMNS + [TARGET_COLUMN] + LEAKY_COLUMNS, |
| errors="ignore") |
|
|
| topo_cols_needed = ( |
| ["department_id"] |
| + TOPOLOGY_NUMERIC_FEATURES |
| + TOPOLOGY_CATEGORICAL_FEATURES |
| ) |
| traj = traj.merge( |
| topo[topo_cols_needed], |
| left_on="target_department_id", right_on="department_id", how="left", |
| ).drop(columns=["department_id"], errors="ignore") |
|
|
| traj = _add_engineered_features(traj) |
|
|
| numeric_features = ( |
| DIRECT_NUMERIC_TIMESTEP_FEATURES |
| + TOPOLOGY_NUMERIC_FEATURES |
| + [ |
| "log_emails_sent", "is_gateway_blocked_step", "is_evasion_active", |
| "is_high_personalisation", "has_credential_capture", "has_user_engagement", |
| ] |
| ) |
| X_numeric = traj[numeric_features].astype(float) |
|
|
| all_categorical = ( |
| [(col, "timestep") for col in CATEGORICAL_TIMESTEP_FEATURES] |
| + [(col, "topology") for col in TOPOLOGY_CATEGORICAL_FEATURES] |
| ) |
| categorical_levels: dict[str, list[str]] = {} |
| blocks: list[pd.DataFrame] = [] |
| for col, _src in all_categorical: |
| if col not in traj.columns: |
| continue |
| levels = sorted(traj[col].dropna().unique().tolist()) |
| categorical_levels[col] = levels |
| block = pd.get_dummies( |
| traj[col].astype("category").cat.set_categories(levels), |
| prefix=col, dummy_na=False, |
| ).astype(int) |
| blocks.append(block) |
|
|
| X = pd.concat( |
| [X_numeric.reset_index(drop=True)] |
| + [b.reset_index(drop=True) for b in blocks], |
| axis=1, |
| ).fillna(0.0) |
|
|
| meta = { |
| "feature_names": X.columns.tolist(), |
| "numeric_features": numeric_features, |
| "categorical_levels": categorical_levels, |
| "label_to_int": LABEL_TO_INT, |
| "int_to_label": INT_TO_LABEL, |
| "leakage_excluded": LEAKY_COLUMNS, |
| } |
| return X, y, groups, meta |
|
|
|
|
| def transform_single( |
| record: dict | pd.DataFrame, |
| meta: dict[str, Any], |
| victim_aggregates: dict | None = None, |
| ) -> np.ndarray: |
| """Encode a single timestep record for inference.""" |
| if isinstance(record, dict): |
| df = pd.DataFrame([record.copy()]) |
| else: |
| df = record.copy() |
|
|
| if victim_aggregates is not None: |
| for k, v in victim_aggregates.items(): |
| df[k] = v |
|
|
| df = _add_engineered_features(df) |
|
|
| numeric = pd.DataFrame({ |
| col: df.get(col, pd.Series([0.0] * len(df))).astype(float).values |
| for col in meta["numeric_features"] |
| }) |
| blocks: list[pd.DataFrame] = [numeric] |
| for col, levels in meta["categorical_levels"].items(): |
| val = df.get(col, pd.Series([None] * len(df))) |
| block = pd.get_dummies( |
| val.astype("category").cat.set_categories(levels), |
| prefix=col, dummy_na=False, |
| ).astype(int) |
| for lvl in levels: |
| cname = f"{col}_{lvl}" |
| if cname not in block.columns: |
| block[cname] = 0 |
| block = block[[f"{col}_{lvl}" for lvl in levels]] |
| blocks.append(block) |
|
|
| X = pd.concat(blocks, axis=1).fillna(0.0) |
| X = X.reindex(columns=meta["feature_names"], fill_value=0.0) |
| return X.values.astype(np.float32) |
|
|
|
|
| def save_meta(meta: dict[str, Any], path: str | Path) -> None: |
| serializable = { |
| "feature_names": meta["feature_names"], |
| "numeric_features": meta["numeric_features"], |
| "categorical_levels": meta["categorical_levels"], |
| "label_to_int": meta["label_to_int"], |
| "int_to_label": {str(k): v for k, v in meta["int_to_label"].items()}, |
| "leakage_excluded": meta.get("leakage_excluded", []), |
| } |
| with open(path, "w") as f: |
| json.dump(serializable, f, indent=2) |
|
|
|
|
| def load_meta(path: str | Path) -> dict[str, Any]: |
| with open(path) as f: |
| meta = json.load(f) |
| meta["int_to_label"] = {int(k): v for k, v in meta["int_to_label"].items()} |
| return meta |
|
|
|
|
| def build_department_lookup(topology_path: str | Path) -> dict[int, dict]: |
| """Build {department_id: {topology features}} for inference-time lookup.""" |
| topo = pd.read_csv(topology_path) |
| cols = TOPOLOGY_NUMERIC_FEATURES + TOPOLOGY_CATEGORICAL_FEATURES |
| out = {} |
| for _, row in topo.iterrows(): |
| out[int(row["department_id"])] = {c: row[c] for c in cols if c in topo.columns} |
| return out |
|
|
|
|
| if __name__ == "__main__": |
| import sys |
| base = Path(sys.argv[1]) if len(sys.argv) > 1 else Path("/mnt/user-data/uploads") |
| X, y, groups, meta = build_features( |
| base / "campaign_trajectories.csv", |
| base / "victim_topology.csv", |
| ) |
| print(f"X shape: {X.shape}") |
| print(f"y shape: {y.shape}") |
| print(f"groups: {groups.nunique()} campaigns") |
| print(f"n features: {len(meta['feature_names'])}") |
| print(f"label distribution:\n{y.map(INT_TO_LABEL).value_counts()}") |
| print(f"X has NaN: {X.isnull().any().any()}") |
|
|