| """ |
| feature_engineering.py |
| ====================== |
| |
| Feature pipeline for the CYB005 baseline classifier. |
| |
| Predicts `actor_capability_tier` (4-class) from per-timestep ransomware |
| campaign telemetry on the CYB005 sample dataset. |
| |
| CSV inputs: |
| attack_timelines.csv (primary, one row per timestep, 500 campaigns |
| x 75 timesteps = 37,489 rows) |
| victim_topology.csv (per-segment defender configuration, joined |
| on target_segment_id; one row per segment) |
| campaign_summary.csv (per-campaign aggregates; reserved for future |
| work - many fields are post-hoc outcomes that |
| would leak the tier through training) |
| campaign_events.csv (discrete event log; reserved for future work) |
| |
| Target classes (4): |
| lone_actor, organised_syndicate, raas_affiliate, nation_state_nexus |
| |
| Sample size note |
| ---------------- |
| CYB005's sample is intentionally larger than its sister datasets (500 |
| campaigns vs 100 in CYB002/3/4). The README states this is because |
| "benchmarks are conditional on small actor-tier subsets". The larger |
| sample makes tier attribution genuinely learnable here, where it was |
| not in CYB003/CYB004. |
| |
| Leakage audit |
| ------------- |
| Three columns inspected for tier leakage: |
| - `attribution_risk_score` - mean 0.016-0.026 across tiers, ranges |
| overlap heavily. NOT an oracle; keep. |
| - `living_off_land_score` - mean 0.05 (lone) to 0.20 (nation_state), |
| with substantial overlap (std 0.08-0.25). Real observable, not |
| an oracle; keep. |
| - `attack_phase` - 89% purity vs `detection_outcome` (recovery_in_progress |
| is a 1:1 alias), but for TIER prediction it has no oracle relationship. |
| Keep. |
| |
| No columns are dropped for tier prediction. The model is trained on what |
| a SOC analyst would actually see at observation time. |
| |
| Public API |
| ---------- |
| build_features(timelines_path, topology_path) |
| -> (X, y, groups, meta) |
| transform_single(record, meta, segment_aggregates=None) -> np.ndarray |
| save_meta(meta, path) / load_meta(path) |
| build_segment_lookup(topology_path) -> dict |
| |
| License |
| ------- |
| Ships with the public model on Hugging Face under CC-BY-NC-4.0, |
| matching the dataset license. See README.md. |
| """ |
|
|
| from __future__ import annotations |
|
|
| import json |
| from pathlib import Path |
| from typing import Any |
|
|
| import numpy as np |
| import pandas as pd |
|
|
| |
| |
| |
|
|
| |
| |
| |
| LABEL_ORDER = [ |
| "lone_actor", |
| "organised_syndicate", |
| "raas_affiliate", |
| "nation_state_nexus", |
| ] |
| LABEL_TO_INT = {lbl: i for i, lbl in enumerate(LABEL_ORDER)} |
| INT_TO_LABEL = {i: lbl for lbl, i in LABEL_TO_INT.items()} |
|
|
| |
| |
| |
|
|
| ID_COLUMNS = ["campaign_id", "actor_id"] |
| TARGET_COLUMN = "actor_capability_tier" |
|
|
| |
| |
| LEAKY_COLUMNS: list[str] = [] |
|
|
| |
| |
| |
|
|
| DIRECT_NUMERIC_TIMESTEP_FEATURES = [ |
| "timestep", |
| "files_encrypted_cumulative", |
| "encryption_throughput_mbps", |
| "endpoints_compromised", |
| "lateral_move_count", |
| "credential_harvest_count", |
| "c2_bytes_exfiltrated", |
| "defender_alert_score", |
| "blast_radius_pct", |
| "living_off_land_score", |
| "attribution_risk_score", |
| "data_exfiltrated_gb", |
| "wiper_flag", |
| "double_extortion_flag", |
| "ir_activated", |
| ] |
|
|
| |
| CATEGORICAL_TIMESTEP_FEATURES = [ |
| "attack_phase", |
| "detection_outcome", |
| ] |
|
|
| |
| |
| |
| |
| |
| |
| |
|
|
| TOPOLOGY_NUMERIC_FEATURES = [ |
| "edr_coverage_rate", |
| "network_segmentation_quality", |
| "patch_posture_score", |
| "ir_activation_latency_hrs", |
| "endpoint_count", |
| "ad_domain_complexity", |
| "soc_maturity_score", |
| "backup_recovery_prob", |
| "backup_recovery_hrs_mean", |
| "siem_rule_refresh_cadence_days", |
| ] |
|
|
| TOPOLOGY_CATEGORICAL_FEATURES = [ |
| "segment_type", |
| "soc_maturity_tier", |
| "backup_maturity_tier", |
| ] |
|
|
|
|
| |
| |
| |
|
|
| def _add_engineered_features(df: pd.DataFrame) -> pd.DataFrame: |
| """ |
| Six engineered features encoding tier-discriminative hypotheses. |
| Each is a behavioural composite that a threat analyst would compute |
| by hand to distinguish actor sophistication levels. |
| """ |
| df = df.copy() |
|
|
| |
| |
| |
| df["c2_intensity_score"] = np.log1p( |
| df["c2_bytes_exfiltrated"].clip(lower=0) |
| * df["encryption_throughput_mbps"].clip(lower=0) |
| ).astype(float) |
|
|
| |
| |
| df["escalation_velocity"] = ( |
| df["lateral_move_count"] / df["timestep"].clip(lower=1) |
| ).astype(float) |
|
|
| |
| |
| df["is_destructive"] = ( |
| (df["wiper_flag"] == 1) | (df["double_extortion_flag"] == 1) |
| ).astype(int) |
|
|
| |
| |
| |
| df["dwell_efficiency"] = ( |
| df["blast_radius_pct"] / df["timestep"].clip(lower=1) |
| ).astype(float) |
|
|
| |
| |
| |
| df["is_post_detonation"] = (df["timestep"] > 50).astype(int) |
|
|
| |
| |
| |
| df["lotl_intensity_bin"] = pd.cut( |
| df["living_off_land_score"], bins=[-0.01, 0.1, 0.3, 0.6, 1.01], |
| labels=[0, 1, 2, 3], |
| ).astype(int) |
|
|
| return df |
|
|
|
|
| |
| |
| |
|
|
| def build_features( |
| timelines_path: str | Path, |
| topology_path: str | Path, |
| ) -> tuple[pd.DataFrame, pd.Series, pd.Series, dict[str, Any]]: |
| """ |
| Load CSVs, join topology, drop target + identifiers, engineer features, |
| one-hot encode, return (X, y, groups, meta). |
| |
| `groups` is a Series of campaign_id values aligned with X. Use it with |
| GroupShuffleSplit / GroupKFold so train and test sets contain disjoint |
| campaigns - each campaign generates 75 highly-correlated timesteps. |
| """ |
| timelines = pd.read_csv(timelines_path) |
| topo = pd.read_csv(topology_path) |
|
|
| y = timelines[TARGET_COLUMN].map(LABEL_TO_INT) |
| if y.isna().any(): |
| bad = timelines.loc[y.isna(), TARGET_COLUMN].unique() |
| raise ValueError(f"Unknown actor_capability_tier values: {bad}") |
| y = y.astype(int) |
| groups = timelines["campaign_id"].copy() |
|
|
| timelines = timelines.drop( |
| columns=ID_COLUMNS + [TARGET_COLUMN] + LEAKY_COLUMNS, errors="ignore", |
| ) |
|
|
| |
| topo_cols_needed = ( |
| ["segment_id"] + TOPOLOGY_NUMERIC_FEATURES + TOPOLOGY_CATEGORICAL_FEATURES |
| ) |
| timelines = timelines.merge( |
| topo[topo_cols_needed], |
| left_on="target_segment_id", right_on="segment_id", how="left", |
| ).drop(columns=["segment_id"], errors="ignore") |
|
|
| |
| |
| timelines["segment_id_hash"] = ( |
| timelines["target_segment_id"].astype("category").cat.codes.astype(float) |
| ) |
| timelines = timelines.drop(columns=["target_segment_id"]) |
|
|
| timelines = _add_engineered_features(timelines) |
|
|
| numeric_features = ( |
| DIRECT_NUMERIC_TIMESTEP_FEATURES |
| + TOPOLOGY_NUMERIC_FEATURES |
| + [ |
| "segment_id_hash", |
| "c2_intensity_score", "escalation_velocity", "is_destructive", |
| "dwell_efficiency", "is_post_detonation", "lotl_intensity_bin", |
| ] |
| ) |
| X_numeric = timelines[numeric_features].astype(float) |
|
|
| all_categorical = ( |
| [(col, "timestep") for col in CATEGORICAL_TIMESTEP_FEATURES] |
| + [(col, "topology") for col in TOPOLOGY_CATEGORICAL_FEATURES] |
| ) |
| categorical_levels: dict[str, list[str]] = {} |
| blocks: list[pd.DataFrame] = [] |
| for col, _src in all_categorical: |
| if col not in timelines.columns: |
| continue |
| levels = sorted(timelines[col].dropna().unique().tolist()) |
| categorical_levels[col] = levels |
| block = pd.get_dummies( |
| timelines[col].astype("category").cat.set_categories(levels), |
| prefix=col, dummy_na=False, |
| ).astype(int) |
| blocks.append(block) |
|
|
| X = pd.concat( |
| [X_numeric.reset_index(drop=True)] |
| + [b.reset_index(drop=True) for b in blocks], |
| axis=1, |
| ).fillna(0.0) |
|
|
| meta = { |
| "feature_names": X.columns.tolist(), |
| "numeric_features": numeric_features, |
| "categorical_levels": categorical_levels, |
| "label_to_int": LABEL_TO_INT, |
| "int_to_label": INT_TO_LABEL, |
| "leakage_excluded": LEAKY_COLUMNS, |
| } |
| return X, y, groups, meta |
|
|
|
|
| def transform_single( |
| record: dict | pd.DataFrame, |
| meta: dict[str, Any], |
| segment_aggregates: dict | None = None, |
| ) -> np.ndarray: |
| """Encode a single timestep record for inference.""" |
| if isinstance(record, dict): |
| df = pd.DataFrame([record.copy()]) |
| else: |
| df = record.copy() |
|
|
| if segment_aggregates is not None: |
| for k, v in segment_aggregates.items(): |
| df[k] = v |
|
|
| |
| if "segment_id_hash" not in df.columns: |
| df["segment_id_hash"] = 0.0 |
| if "target_segment_id" in df.columns: |
| df = df.drop(columns=["target_segment_id"]) |
|
|
| df = _add_engineered_features(df) |
|
|
| numeric = pd.DataFrame({ |
| col: df.get(col, pd.Series([0.0] * len(df))).astype(float).values |
| for col in meta["numeric_features"] |
| }) |
| blocks: list[pd.DataFrame] = [numeric] |
| for col, levels in meta["categorical_levels"].items(): |
| val = df.get(col, pd.Series([None] * len(df))) |
| block = pd.get_dummies( |
| val.astype("category").cat.set_categories(levels), |
| prefix=col, dummy_na=False, |
| ).astype(int) |
| for lvl in levels: |
| cname = f"{col}_{lvl}" |
| if cname not in block.columns: |
| block[cname] = 0 |
| block = block[[f"{col}_{lvl}" for lvl in levels]] |
| blocks.append(block) |
|
|
| X = pd.concat(blocks, axis=1).fillna(0.0) |
| X = X.reindex(columns=meta["feature_names"], fill_value=0.0) |
| return X.values.astype(np.float32) |
|
|
|
|
| def save_meta(meta: dict[str, Any], path: str | Path) -> None: |
| serializable = { |
| "feature_names": meta["feature_names"], |
| "numeric_features": meta["numeric_features"], |
| "categorical_levels": meta["categorical_levels"], |
| "label_to_int": meta["label_to_int"], |
| "int_to_label": {str(k): v for k, v in meta["int_to_label"].items()}, |
| "leakage_excluded": meta.get("leakage_excluded", []), |
| } |
| with open(path, "w") as f: |
| json.dump(serializable, f, indent=2) |
|
|
|
|
| def load_meta(path: str | Path) -> dict[str, Any]: |
| with open(path) as f: |
| meta = json.load(f) |
| meta["int_to_label"] = {int(k): v for k, v in meta["int_to_label"].items()} |
| return meta |
|
|
|
|
| def build_segment_lookup(topology_path: str | Path) -> dict[str, dict]: |
| """Build {segment_id: {topology feature values}} for inference-time lookup.""" |
| topo = pd.read_csv(topology_path) |
| cols = TOPOLOGY_NUMERIC_FEATURES + TOPOLOGY_CATEGORICAL_FEATURES |
| out = {} |
| for _, row in topo.iterrows(): |
| out[row["segment_id"]] = {c: row[c] for c in cols if c in topo.columns} |
| return out |
|
|
|
|
| if __name__ == "__main__": |
| import sys |
| base = Path(sys.argv[1]) if len(sys.argv) > 1 else Path("/mnt/user-data/uploads") |
| X, y, groups, meta = build_features( |
| base / "attack_timelines.csv", |
| base / "victim_topology.csv", |
| ) |
| print(f"X shape: {X.shape}") |
| print(f"y shape: {y.shape}") |
| print(f"groups: {groups.nunique()} campaigns") |
| print(f"n features: {len(meta['feature_names'])}") |
| print(f"label distribution:\n{y.map(INT_TO_LABEL).value_counts()}") |
| print(f"X has NaN: {X.isnull().any().any()}") |
|
|