cyb007-baseline-classifier / feature_engineering.py
pradeep-xpert's picture
Initial release: XGBoost + MLP for insider threat type classification
ed9d6a1 verified
"""
feature_engineering.py
======================
Feature pipeline for the CYB007 baseline classifier.
Predicts `actor_threat_type` (3-class: negligent_user / malicious_employee
/ privileged_insider) from per-timestep insider threat trajectory data on
the CYB007 sample dataset.
CSV inputs:
insider_trajectories.csv (primary, per-timestep, 500 incidents x 65
timesteps = 32,500 rows)
incident_summary.csv (per-incident aggregates; reserved for
future work)
incident_events.csv (discrete incident event log; reserved
for future work - 191 collusion records
out of 38,687 events)
org_topology.csv (per-department defender configuration;
joinable to events but not directly to
per-timestep trajectories without a
department key on the trajectory row)
Target classes (3):
negligent_user, malicious_employee, privileged_insider
The CYB007 README claims 4 actor tiers (adds compromised_account) but
the sample data contains only 3. We train on the 3 that exist.
Sample-size note
----------------
500 incidents with 65 timesteps each is the same volume profile as
CYB005 (500 campaigns × 75 timesteps). At this scale, group-aware
splitting yields ~75 test incidents (~11-25 per tier), which is enough
to learn tier attribution honestly. CYB003/4/6 pivoted away from the
README's stated tier-attribution headline because their samples had
only 100 groups; CYB007 ships the headline use case.
Leakage audit
-------------
Two features have strongly tier-correlated means but with substantial
distributional overlap:
- data_access_volume_mb: privileged 0-2541, malicious 0-328,
negligent 0-88. Overlap region [0, 88] covers most timesteps for all
three tiers (median ~9 MB each). Real observable, not oracle. KEPT.
- exfiltration_volume_mb_cumulative: similar shape, overlap [0, ~5].
Real observable. KEPT.
Removing both features drops accuracy from 0.85 to 0.47 (below
majority). This confirms they are not oracles - they carry legitimate
discriminative signal that defines what privileged_insider means.
`detection_outcome` is near-oracle for incident_phase (purity 0.79,
max 1.00 for reconnaissance). For TIER prediction it has no oracle
relationship (purity vs tier is uniform around 0.50). KEPT.
No columns dropped for this task.
Public API
----------
build_features(trajectories_path) -> (X, y, groups, meta)
transform_single(record, meta) -> np.ndarray
save_meta(meta, path) / load_meta(path)
License
-------
Ships with the public model on Hugging Face under CC-BY-NC-4.0,
matching the dataset license. See README.md.
"""
from __future__ import annotations
import json
from pathlib import Path
from typing import Any
import numpy as np
import pandas as pd
# ---------------------------------------------------------------------------
# Label space
# ---------------------------------------------------------------------------
# Ordered roughly by access/sophistication. The CYB007 README claims a 4th
# tier 'compromised_account' but the sample data contains only 3.
LABEL_ORDER = [
"negligent_user",
"malicious_employee",
"privileged_insider",
]
LABEL_TO_INT = {lbl: i for i, lbl in enumerate(LABEL_ORDER)}
INT_TO_LABEL = {i: lbl for lbl, i in LABEL_TO_INT.items()}
# ---------------------------------------------------------------------------
# Identifier and target columns
# ---------------------------------------------------------------------------
ID_COLUMNS = ["incident_id", "actor_id"]
TARGET_COLUMN = "actor_threat_type"
# No columns dropped for leakage. See module docstring's "Leakage audit".
LEAKY_COLUMNS: list[str] = []
# ---------------------------------------------------------------------------
# Per-timestep numeric features
# ---------------------------------------------------------------------------
DIRECT_NUMERIC_TIMESTEP_FEATURES = [
"timestep", # position in 65-step lifecycle
"data_access_volume_mb",
"privilege_event_count",
"communication_anomaly_score",
"dlp_confidence_score",
"exfiltration_volume_mb_cumulative",
"behavioural_risk_score",
]
# Per-timestep categoricals to one-hot
CATEGORICAL_TIMESTEP_FEATURES = [
"incident_phase", # 8 values
"detection_outcome", # 4 values
"target_data_sensitivity_tier", # 3 values
]
# ---------------------------------------------------------------------------
# Engineered features
# ---------------------------------------------------------------------------
def _add_engineered_features(df: pd.DataFrame) -> pd.DataFrame:
"""
Six engineered features encoding tier-discriminative hypotheses.
Each composite would be computed by a security analyst by hand.
"""
df = df.copy()
# 1. Log-scaled data volume. data_access_volume_mb is heavy-tailed
# (median ~9 MB, max ~2541 MB for privileged insiders). log1p
# compresses for both XGBoost and MLP.
df["log_data_volume"] = np.log1p(
df["data_access_volume_mb"].clip(lower=0)
).astype(float)
# 2. Log-scaled cumulative exfiltration. Same heavy-tail shape.
df["log_cumulative_exfil"] = np.log1p(
df["exfiltration_volume_mb_cumulative"].clip(lower=0)
).astype(float)
# 3. Exfil velocity: cumulative exfil per timestep elapsed.
# High = aggressive exfiltration; low = patient or accidental.
df["exfil_velocity"] = (
df["exfiltration_volume_mb_cumulative"]
/ df["timestep"].clip(lower=1)
).astype(float)
# 4. Privileged event indicator. privilege_event_count > 0 marks
# timesteps with privileged operations. Strong privileged_insider
# signature.
df["is_privileged_event"] = (df["privilege_event_count"] > 0).astype(int)
# 5. Risk x DLP composite. Combines behavioural risk score with
# DLP confidence - high values indicate both behavioural anomaly
# AND DLP-recognised risk pattern.
df["risk_x_dlp_composite"] = (
df["behavioural_risk_score"] * df["dlp_confidence_score"]
).astype(float)
# 6. Late-stage indicator. Timesteps after 40 sit in cover_tracks /
# incident_resolution / late exfiltration_attempt; tier signal
# differs across these late phases.
df["is_late_stage"] = (df["timestep"] > 40).astype(int)
return df
# ---------------------------------------------------------------------------
# Public API
# ---------------------------------------------------------------------------
def build_features(
trajectories_path: str | Path,
) -> tuple[pd.DataFrame, pd.Series, pd.Series, dict[str, Any]]:
"""
Load CSV, drop target + identifiers, engineer features, one-hot encode,
return (X, y, groups, meta).
`groups` is a Series of incident_id values aligned with X. Use it with
GroupShuffleSplit / GroupKFold so train and test sets contain disjoint
incidents - each incident generates 65 highly-correlated timesteps.
"""
traj = pd.read_csv(trajectories_path)
y = traj[TARGET_COLUMN].map(LABEL_TO_INT)
if y.isna().any():
bad = traj.loc[y.isna(), TARGET_COLUMN].unique()
raise ValueError(f"Unknown actor_threat_type values: {bad}")
y = y.astype(int)
groups = traj["incident_id"].copy()
traj = traj.drop(
columns=ID_COLUMNS + [TARGET_COLUMN] + LEAKY_COLUMNS, errors="ignore",
)
traj = _add_engineered_features(traj)
numeric_features = (
DIRECT_NUMERIC_TIMESTEP_FEATURES
+ [
"log_data_volume", "log_cumulative_exfil", "exfil_velocity",
"is_privileged_event", "risk_x_dlp_composite", "is_late_stage",
]
)
X_numeric = traj[numeric_features].astype(float)
categorical_levels: dict[str, list[str]] = {}
blocks: list[pd.DataFrame] = []
for col in CATEGORICAL_TIMESTEP_FEATURES:
if col not in traj.columns:
continue
levels = sorted(traj[col].dropna().unique().tolist())
categorical_levels[col] = levels
block = pd.get_dummies(
traj[col].astype("category").cat.set_categories(levels),
prefix=col, dummy_na=False,
).astype(int)
blocks.append(block)
X = pd.concat(
[X_numeric.reset_index(drop=True)]
+ [b.reset_index(drop=True) for b in blocks],
axis=1,
).fillna(0.0)
meta = {
"feature_names": X.columns.tolist(),
"numeric_features": numeric_features,
"categorical_levels": categorical_levels,
"label_to_int": LABEL_TO_INT,
"int_to_label": INT_TO_LABEL,
"leakage_excluded": LEAKY_COLUMNS,
}
return X, y, groups, meta
def transform_single(
record: dict | pd.DataFrame,
meta: dict[str, Any],
) -> np.ndarray:
"""Encode a single timestep record for inference."""
if isinstance(record, dict):
df = pd.DataFrame([record.copy()])
else:
df = record.copy()
df = _add_engineered_features(df)
numeric = pd.DataFrame({
col: df.get(col, pd.Series([0.0] * len(df))).astype(float).values
for col in meta["numeric_features"]
})
blocks: list[pd.DataFrame] = [numeric]
for col, levels in meta["categorical_levels"].items():
val = df.get(col, pd.Series([None] * len(df)))
block = pd.get_dummies(
val.astype("category").cat.set_categories(levels),
prefix=col, dummy_na=False,
).astype(int)
for lvl in levels:
cname = f"{col}_{lvl}"
if cname not in block.columns:
block[cname] = 0
block = block[[f"{col}_{lvl}" for lvl in levels]]
blocks.append(block)
X = pd.concat(blocks, axis=1).fillna(0.0)
X = X.reindex(columns=meta["feature_names"], fill_value=0.0)
return X.values.astype(np.float32)
def save_meta(meta: dict[str, Any], path: str | Path) -> None:
serializable = {
"feature_names": meta["feature_names"],
"numeric_features": meta["numeric_features"],
"categorical_levels": meta["categorical_levels"],
"label_to_int": meta["label_to_int"],
"int_to_label": {str(k): v for k, v in meta["int_to_label"].items()},
"leakage_excluded": meta.get("leakage_excluded", []),
}
with open(path, "w") as f:
json.dump(serializable, f, indent=2)
def load_meta(path: str | Path) -> dict[str, Any]:
with open(path) as f:
meta = json.load(f)
meta["int_to_label"] = {int(k): v for k, v in meta["int_to_label"].items()}
return meta
if __name__ == "__main__":
import sys
base = Path(sys.argv[1]) if len(sys.argv) > 1 else Path("/mnt/user-data/uploads")
X, y, groups, meta = build_features(base / "insider_trajectories.csv")
print(f"X shape: {X.shape}")
print(f"y shape: {y.shape}")
print(f"groups: {groups.nunique()} incidents")
print(f"n_features: {len(meta['feature_names'])}")
print(f"label distribution:\n{y.map(INT_TO_LABEL).value_counts()}")
print(f"X has NaN: {X.isnull().any().any()}")