| """ |
| feature_engineering.py |
| ====================== |
| |
| Feature pipeline for the CYB007 baseline classifier. |
| |
| Predicts `actor_threat_type` (3-class: negligent_user / malicious_employee |
| / privileged_insider) from per-timestep insider threat trajectory data on |
| the CYB007 sample dataset. |
| |
| CSV inputs: |
| insider_trajectories.csv (primary, per-timestep, 500 incidents x 65 |
| timesteps = 32,500 rows) |
| incident_summary.csv (per-incident aggregates; reserved for |
| future work) |
| incident_events.csv (discrete incident event log; reserved |
| for future work - 191 collusion records |
| out of 38,687 events) |
| org_topology.csv (per-department defender configuration; |
| joinable to events but not directly to |
| per-timestep trajectories without a |
| department key on the trajectory row) |
| |
| Target classes (3): |
| negligent_user, malicious_employee, privileged_insider |
| |
| The CYB007 README claims 4 actor tiers (adds compromised_account) but |
| the sample data contains only 3. We train on the 3 that exist. |
| |
| Sample-size note |
| ---------------- |
| 500 incidents with 65 timesteps each is the same volume profile as |
| CYB005 (500 campaigns × 75 timesteps). At this scale, group-aware |
| splitting yields ~75 test incidents (~11-25 per tier), which is enough |
| to learn tier attribution honestly. CYB003/4/6 pivoted away from the |
| README's stated tier-attribution headline because their samples had |
| only 100 groups; CYB007 ships the headline use case. |
| |
| Leakage audit |
| ------------- |
| Two features have strongly tier-correlated means but with substantial |
| distributional overlap: |
| - data_access_volume_mb: privileged 0-2541, malicious 0-328, |
| negligent 0-88. Overlap region [0, 88] covers most timesteps for all |
| three tiers (median ~9 MB each). Real observable, not oracle. KEPT. |
| - exfiltration_volume_mb_cumulative: similar shape, overlap [0, ~5]. |
| Real observable. KEPT. |
| |
| Removing both features drops accuracy from 0.85 to 0.47 (below |
| majority). This confirms they are not oracles - they carry legitimate |
| discriminative signal that defines what privileged_insider means. |
| |
| `detection_outcome` is near-oracle for incident_phase (purity 0.79, |
| max 1.00 for reconnaissance). For TIER prediction it has no oracle |
| relationship (purity vs tier is uniform around 0.50). KEPT. |
| |
| No columns dropped for this task. |
| |
| Public API |
| ---------- |
| build_features(trajectories_path) -> (X, y, groups, meta) |
| transform_single(record, meta) -> np.ndarray |
| save_meta(meta, path) / load_meta(path) |
| |
| License |
| ------- |
| Ships with the public model on Hugging Face under CC-BY-NC-4.0, |
| matching the dataset license. See README.md. |
| """ |
|
|
| from __future__ import annotations |
|
|
| import json |
| from pathlib import Path |
| from typing import Any |
|
|
| import numpy as np |
| import pandas as pd |
|
|
| |
| |
| |
|
|
| |
| |
| LABEL_ORDER = [ |
| "negligent_user", |
| "malicious_employee", |
| "privileged_insider", |
| ] |
| LABEL_TO_INT = {lbl: i for i, lbl in enumerate(LABEL_ORDER)} |
| INT_TO_LABEL = {i: lbl for lbl, i in LABEL_TO_INT.items()} |
|
|
| |
| |
| |
|
|
| ID_COLUMNS = ["incident_id", "actor_id"] |
| TARGET_COLUMN = "actor_threat_type" |
|
|
| |
| LEAKY_COLUMNS: list[str] = [] |
|
|
| |
| |
| |
|
|
| DIRECT_NUMERIC_TIMESTEP_FEATURES = [ |
| "timestep", |
| "data_access_volume_mb", |
| "privilege_event_count", |
| "communication_anomaly_score", |
| "dlp_confidence_score", |
| "exfiltration_volume_mb_cumulative", |
| "behavioural_risk_score", |
| ] |
|
|
| |
| CATEGORICAL_TIMESTEP_FEATURES = [ |
| "incident_phase", |
| "detection_outcome", |
| "target_data_sensitivity_tier", |
| ] |
|
|
|
|
| |
| |
| |
|
|
| def _add_engineered_features(df: pd.DataFrame) -> pd.DataFrame: |
| """ |
| Six engineered features encoding tier-discriminative hypotheses. |
| Each composite would be computed by a security analyst by hand. |
| """ |
| df = df.copy() |
|
|
| |
| |
| |
| df["log_data_volume"] = np.log1p( |
| df["data_access_volume_mb"].clip(lower=0) |
| ).astype(float) |
|
|
| |
| df["log_cumulative_exfil"] = np.log1p( |
| df["exfiltration_volume_mb_cumulative"].clip(lower=0) |
| ).astype(float) |
|
|
| |
| |
| df["exfil_velocity"] = ( |
| df["exfiltration_volume_mb_cumulative"] |
| / df["timestep"].clip(lower=1) |
| ).astype(float) |
|
|
| |
| |
| |
| df["is_privileged_event"] = (df["privilege_event_count"] > 0).astype(int) |
|
|
| |
| |
| |
| df["risk_x_dlp_composite"] = ( |
| df["behavioural_risk_score"] * df["dlp_confidence_score"] |
| ).astype(float) |
|
|
| |
| |
| |
| df["is_late_stage"] = (df["timestep"] > 40).astype(int) |
|
|
| return df |
|
|
|
|
| |
| |
| |
|
|
| def build_features( |
| trajectories_path: str | Path, |
| ) -> tuple[pd.DataFrame, pd.Series, pd.Series, dict[str, Any]]: |
| """ |
| Load CSV, drop target + identifiers, engineer features, one-hot encode, |
| return (X, y, groups, meta). |
| |
| `groups` is a Series of incident_id values aligned with X. Use it with |
| GroupShuffleSplit / GroupKFold so train and test sets contain disjoint |
| incidents - each incident generates 65 highly-correlated timesteps. |
| """ |
| traj = pd.read_csv(trajectories_path) |
|
|
| y = traj[TARGET_COLUMN].map(LABEL_TO_INT) |
| if y.isna().any(): |
| bad = traj.loc[y.isna(), TARGET_COLUMN].unique() |
| raise ValueError(f"Unknown actor_threat_type values: {bad}") |
| y = y.astype(int) |
| groups = traj["incident_id"].copy() |
|
|
| traj = traj.drop( |
| columns=ID_COLUMNS + [TARGET_COLUMN] + LEAKY_COLUMNS, errors="ignore", |
| ) |
|
|
| traj = _add_engineered_features(traj) |
|
|
| numeric_features = ( |
| DIRECT_NUMERIC_TIMESTEP_FEATURES |
| + [ |
| "log_data_volume", "log_cumulative_exfil", "exfil_velocity", |
| "is_privileged_event", "risk_x_dlp_composite", "is_late_stage", |
| ] |
| ) |
| X_numeric = traj[numeric_features].astype(float) |
|
|
| categorical_levels: dict[str, list[str]] = {} |
| blocks: list[pd.DataFrame] = [] |
| for col in CATEGORICAL_TIMESTEP_FEATURES: |
| if col not in traj.columns: |
| continue |
| levels = sorted(traj[col].dropna().unique().tolist()) |
| categorical_levels[col] = levels |
| block = pd.get_dummies( |
| traj[col].astype("category").cat.set_categories(levels), |
| prefix=col, dummy_na=False, |
| ).astype(int) |
| blocks.append(block) |
|
|
| X = pd.concat( |
| [X_numeric.reset_index(drop=True)] |
| + [b.reset_index(drop=True) for b in blocks], |
| axis=1, |
| ).fillna(0.0) |
|
|
| meta = { |
| "feature_names": X.columns.tolist(), |
| "numeric_features": numeric_features, |
| "categorical_levels": categorical_levels, |
| "label_to_int": LABEL_TO_INT, |
| "int_to_label": INT_TO_LABEL, |
| "leakage_excluded": LEAKY_COLUMNS, |
| } |
| return X, y, groups, meta |
|
|
|
|
| def transform_single( |
| record: dict | pd.DataFrame, |
| meta: dict[str, Any], |
| ) -> np.ndarray: |
| """Encode a single timestep record for inference.""" |
| if isinstance(record, dict): |
| df = pd.DataFrame([record.copy()]) |
| else: |
| df = record.copy() |
|
|
| df = _add_engineered_features(df) |
|
|
| numeric = pd.DataFrame({ |
| col: df.get(col, pd.Series([0.0] * len(df))).astype(float).values |
| for col in meta["numeric_features"] |
| }) |
| blocks: list[pd.DataFrame] = [numeric] |
| for col, levels in meta["categorical_levels"].items(): |
| val = df.get(col, pd.Series([None] * len(df))) |
| block = pd.get_dummies( |
| val.astype("category").cat.set_categories(levels), |
| prefix=col, dummy_na=False, |
| ).astype(int) |
| for lvl in levels: |
| cname = f"{col}_{lvl}" |
| if cname not in block.columns: |
| block[cname] = 0 |
| block = block[[f"{col}_{lvl}" for lvl in levels]] |
| blocks.append(block) |
|
|
| X = pd.concat(blocks, axis=1).fillna(0.0) |
| X = X.reindex(columns=meta["feature_names"], fill_value=0.0) |
| return X.values.astype(np.float32) |
|
|
|
|
| def save_meta(meta: dict[str, Any], path: str | Path) -> None: |
| serializable = { |
| "feature_names": meta["feature_names"], |
| "numeric_features": meta["numeric_features"], |
| "categorical_levels": meta["categorical_levels"], |
| "label_to_int": meta["label_to_int"], |
| "int_to_label": {str(k): v for k, v in meta["int_to_label"].items()}, |
| "leakage_excluded": meta.get("leakage_excluded", []), |
| } |
| with open(path, "w") as f: |
| json.dump(serializable, f, indent=2) |
|
|
|
|
| def load_meta(path: str | Path) -> dict[str, Any]: |
| with open(path) as f: |
| meta = json.load(f) |
| meta["int_to_label"] = {int(k): v for k, v in meta["int_to_label"].items()} |
| return meta |
|
|
|
|
| if __name__ == "__main__": |
| import sys |
| base = Path(sys.argv[1]) if len(sys.argv) > 1 else Path("/mnt/user-data/uploads") |
| X, y, groups, meta = build_features(base / "insider_trajectories.csv") |
| print(f"X shape: {X.shape}") |
| print(f"y shape: {y.shape}") |
| print(f"groups: {groups.nunique()} incidents") |
| print(f"n_features: {len(meta['feature_names'])}") |
| print(f"label distribution:\n{y.map(INT_TO_LABEL).value_counts()}") |
| print(f"X has NaN: {X.isnull().any().any()}") |
|
|