cyb004-baseline-classifier / feature_engineering.py
pradeep-xpert's picture
Initial release: XGBoost + MLP for phishing campaign-phase classification
16be928 verified
"""
feature_engineering.py
======================
Feature pipeline for the CYB004 baseline classifier.
Predicts `campaign_phase` (7-class) from per-timestep phishing campaign
trajectory data on the CYB004 sample dataset.
CSV inputs:
campaign_trajectories.csv (primary, one row per timestep, 100
campaigns x ~40 timesteps = 3,952 rows)
victim_topology.csv (per-department victim configuration,
joined on target_department_id)
campaign_summary.csv (per-campaign aggregates; reserved for
future work)
campaign_events.csv (discrete event log; reserved for
future work)
Target classes (7 phases observed in the sample):
target_reconnaissance, infrastructure_setup, lure_crafting,
email_delivery, victim_engagement, credential_harvesting,
post_compromise_escalation
This is the email-security / SOC use case: given the observable
campaign telemetry at a moment in time, what phase of the phishing
lifecycle is the campaign in?
The pivot to campaign_phase (away from actor_capability_tier, the
README's headline use case) happened because per-campaign-constant
features (lure_personalisation_score, click_through_rate,
credential_submission_rate, target_department_id) leak tier via the
small test fold under group-aware splitting. With those features
removed, honest tier prediction is below majority baseline. The full
335k-row CYB004 dataset would address this; the sample does not.
See the model card for full discussion.
Public API
----------
build_features(trajectories_path, topology_path)
-> (X, y, groups, meta)
transform_single(record, meta, victim_aggregates=None) -> np.ndarray
save_meta(meta, path) / load_meta(path)
build_department_lookup(topology_path) -> dict
License
-------
Ships with the public model on Hugging Face under CC-BY-NC-4.0, matching
the dataset license. See README.md.
"""
from __future__ import annotations
import json
from pathlib import Path
from typing import Any
import numpy as np
import pandas as pd
# ---------------------------------------------------------------------------
# Label space
# ---------------------------------------------------------------------------
LABEL_ORDER = [
"target_reconnaissance",
"infrastructure_setup",
"lure_crafting",
"email_delivery",
"victim_engagement",
"credential_harvesting",
"post_compromise_escalation",
]
LABEL_TO_INT = {lbl: i for i, lbl in enumerate(LABEL_ORDER)}
INT_TO_LABEL = {i: lbl for lbl, i in LABEL_TO_INT.items()}
# ---------------------------------------------------------------------------
# Identifier and target columns - not features
# ---------------------------------------------------------------------------
ID_COLUMNS = ["campaign_id", "actor_id"]
TARGET_COLUMN = "campaign_phase"
# `actor_capability_tier` is kept as a feature - it's a real SOC observable
# (analysts typically have an actor cluster hypothesis), and its
# purity-vs-phase is 0.18 (uniform baseline 0.14), so it isn't an oracle.
# `delivery_outcome` is dropped: its purity vs phase is much higher
# (0.36) - `no_delivery` appears only in early phases, effectively
# encoding phase position. Keeping it would give the model a near-oracle.
LEAKY_COLUMNS = [
"delivery_outcome",
]
# ---------------------------------------------------------------------------
# Per-timestep numeric features
# ---------------------------------------------------------------------------
DIRECT_NUMERIC_TIMESTEP_FEATURES = [
"timestep", # strong but non-deterministic phase signal
"emails_sent_cumulative", # increases through campaign; useful position proxy
"click_through_rate", # per-campaign constant; informative when combined with timestep
"credential_submission_rate", # per-campaign constant
"gateway_detection_score", # per-step variation
"lure_personalisation_score", # per-campaign constant; tier signal
"target_department_id", # per-campaign constant; treated as ordinal ID
]
# Per-timestep categoricals
CATEGORICAL_TIMESTEP_FEATURES = [
"evasion_technique_active", # 6 levels incl. "none" (82%); active evasion correlates with mid-late phases
"actor_capability_tier", # 4 levels; mostly per-campaign constant
]
# ---------------------------------------------------------------------------
# Victim topology features (joined on target_department_id)
# ---------------------------------------------------------------------------
TOPOLOGY_NUMERIC_FEATURES = [
"employee_count",
"privileged_account_density",
"mfa_enrollment_rate",
"click_susceptibility_base",
"email_volume_daily",
]
TOPOLOGY_CATEGORICAL_FEATURES = [
"department_type",
"industry_sector",
"awareness_training_level",
"gateway_architecture",
"dmarc_enforcement_level",
]
# ---------------------------------------------------------------------------
# Engineered features (none derived from phase or timestep alone)
# ---------------------------------------------------------------------------
def _add_engineered_features(df: pd.DataFrame) -> pd.DataFrame:
"""
Six engineered features. None directly encode phase; each is a
behavioural composite that helps disambiguate adjacent phases.
"""
df = df.copy()
# 1. Log-scaled email volume. emails_sent_cumulative is heavy-tailed
# (0 in recon, hundreds-to-thousands by post_compromise).
df["log_emails_sent"] = np.log1p(df["emails_sent_cumulative"].clip(lower=0)).astype(float)
# 2. Gateway-blocked step. gateway_detection_score > 0.7 marks
# high-confidence gateway intervention; common in email_delivery.
df["is_gateway_blocked_step"] = (df["gateway_detection_score"] > 0.7).astype(int)
# 3. Evasion-active flag. Non-"none" evasion_technique_active
# concentrates in lure_crafting and email_delivery.
df["is_evasion_active"] = (df["evasion_technique_active"] != "none").astype(int)
# 4. High-personalisation flag. lure_personalisation_score > 0.7 is
# an APT-tier signature.
df["is_high_personalisation"] = (df["lure_personalisation_score"] > 0.7).astype(int)
# 5. Has credential capture flag. credential_submission_rate > 0
# indicates the campaign has reached credential-capture phases.
df["has_credential_capture"] = (df["credential_submission_rate"] > 0).astype(int)
# 6. Engaged-victim flag. click_through_rate > 0 indicates
# victim_engagement or later phase.
df["has_user_engagement"] = (df["click_through_rate"] > 0).astype(int)
return df
# ---------------------------------------------------------------------------
# Public API
# ---------------------------------------------------------------------------
def build_features(
trajectories_path: str | Path,
topology_path: str | Path,
) -> tuple[pd.DataFrame, pd.Series, pd.Series, dict[str, Any]]:
"""
Load CSVs, join topology, drop target + leaky columns, engineer features,
one-hot encode, return (X, y, groups, meta).
`groups` is a Series of campaign_id values aligned with X. Use it with
GroupShuffleSplit / GroupKFold: a single campaign generates ~40
correlated timesteps; row-level random splitting inflates metrics.
"""
traj = pd.read_csv(trajectories_path)
topo = pd.read_csv(topology_path)
y = traj[TARGET_COLUMN].map(LABEL_TO_INT)
if y.isna().any():
bad = traj.loc[y.isna(), TARGET_COLUMN].unique()
raise ValueError(f"Unknown campaign_phase values: {bad}")
y = y.astype(int)
groups = traj["campaign_id"].copy()
traj = traj.drop(columns=ID_COLUMNS + [TARGET_COLUMN] + LEAKY_COLUMNS,
errors="ignore")
topo_cols_needed = (
["department_id"]
+ TOPOLOGY_NUMERIC_FEATURES
+ TOPOLOGY_CATEGORICAL_FEATURES
)
traj = traj.merge(
topo[topo_cols_needed],
left_on="target_department_id", right_on="department_id", how="left",
).drop(columns=["department_id"], errors="ignore")
traj = _add_engineered_features(traj)
numeric_features = (
DIRECT_NUMERIC_TIMESTEP_FEATURES
+ TOPOLOGY_NUMERIC_FEATURES
+ [
"log_emails_sent", "is_gateway_blocked_step", "is_evasion_active",
"is_high_personalisation", "has_credential_capture", "has_user_engagement",
]
)
X_numeric = traj[numeric_features].astype(float)
all_categorical = (
[(col, "timestep") for col in CATEGORICAL_TIMESTEP_FEATURES]
+ [(col, "topology") for col in TOPOLOGY_CATEGORICAL_FEATURES]
)
categorical_levels: dict[str, list[str]] = {}
blocks: list[pd.DataFrame] = []
for col, _src in all_categorical:
if col not in traj.columns:
continue
levels = sorted(traj[col].dropna().unique().tolist())
categorical_levels[col] = levels
block = pd.get_dummies(
traj[col].astype("category").cat.set_categories(levels),
prefix=col, dummy_na=False,
).astype(int)
blocks.append(block)
X = pd.concat(
[X_numeric.reset_index(drop=True)]
+ [b.reset_index(drop=True) for b in blocks],
axis=1,
).fillna(0.0)
meta = {
"feature_names": X.columns.tolist(),
"numeric_features": numeric_features,
"categorical_levels": categorical_levels,
"label_to_int": LABEL_TO_INT,
"int_to_label": INT_TO_LABEL,
"leakage_excluded": LEAKY_COLUMNS,
}
return X, y, groups, meta
def transform_single(
record: dict | pd.DataFrame,
meta: dict[str, Any],
victim_aggregates: dict | None = None,
) -> np.ndarray:
"""Encode a single timestep record for inference."""
if isinstance(record, dict):
df = pd.DataFrame([record.copy()])
else:
df = record.copy()
if victim_aggregates is not None:
for k, v in victim_aggregates.items():
df[k] = v
df = _add_engineered_features(df)
numeric = pd.DataFrame({
col: df.get(col, pd.Series([0.0] * len(df))).astype(float).values
for col in meta["numeric_features"]
})
blocks: list[pd.DataFrame] = [numeric]
for col, levels in meta["categorical_levels"].items():
val = df.get(col, pd.Series([None] * len(df)))
block = pd.get_dummies(
val.astype("category").cat.set_categories(levels),
prefix=col, dummy_na=False,
).astype(int)
for lvl in levels:
cname = f"{col}_{lvl}"
if cname not in block.columns:
block[cname] = 0
block = block[[f"{col}_{lvl}" for lvl in levels]]
blocks.append(block)
X = pd.concat(blocks, axis=1).fillna(0.0)
X = X.reindex(columns=meta["feature_names"], fill_value=0.0)
return X.values.astype(np.float32)
def save_meta(meta: dict[str, Any], path: str | Path) -> None:
serializable = {
"feature_names": meta["feature_names"],
"numeric_features": meta["numeric_features"],
"categorical_levels": meta["categorical_levels"],
"label_to_int": meta["label_to_int"],
"int_to_label": {str(k): v for k, v in meta["int_to_label"].items()},
"leakage_excluded": meta.get("leakage_excluded", []),
}
with open(path, "w") as f:
json.dump(serializable, f, indent=2)
def load_meta(path: str | Path) -> dict[str, Any]:
with open(path) as f:
meta = json.load(f)
meta["int_to_label"] = {int(k): v for k, v in meta["int_to_label"].items()}
return meta
def build_department_lookup(topology_path: str | Path) -> dict[int, dict]:
"""Build {department_id: {topology features}} for inference-time lookup."""
topo = pd.read_csv(topology_path)
cols = TOPOLOGY_NUMERIC_FEATURES + TOPOLOGY_CATEGORICAL_FEATURES
out = {}
for _, row in topo.iterrows():
out[int(row["department_id"])] = {c: row[c] for c in cols if c in topo.columns}
return out
if __name__ == "__main__":
import sys
base = Path(sys.argv[1]) if len(sys.argv) > 1 else Path("/mnt/user-data/uploads")
X, y, groups, meta = build_features(
base / "campaign_trajectories.csv",
base / "victim_topology.csv",
)
print(f"X shape: {X.shape}")
print(f"y shape: {y.shape}")
print(f"groups: {groups.nunique()} campaigns")
print(f"n features: {len(meta['feature_names'])}")
print(f"label distribution:\n{y.map(INT_TO_LABEL).value_counts()}")
print(f"X has NaN: {X.isnull().any().any()}")