cyb005-baseline-classifier / feature_engineering.py
pradeep-xpert's picture
Initial release: XGBoost + MLP for ransomware actor-tier attribution
e8aa6ac verified
"""
feature_engineering.py
======================
Feature pipeline for the CYB005 baseline classifier.
Predicts `actor_capability_tier` (4-class) from per-timestep ransomware
campaign telemetry on the CYB005 sample dataset.
CSV inputs:
attack_timelines.csv (primary, one row per timestep, 500 campaigns
x 75 timesteps = 37,489 rows)
victim_topology.csv (per-segment defender configuration, joined
on target_segment_id; one row per segment)
campaign_summary.csv (per-campaign aggregates; reserved for future
work - many fields are post-hoc outcomes that
would leak the tier through training)
campaign_events.csv (discrete event log; reserved for future work)
Target classes (4):
lone_actor, organised_syndicate, raas_affiliate, nation_state_nexus
Sample size note
----------------
CYB005's sample is intentionally larger than its sister datasets (500
campaigns vs 100 in CYB002/3/4). The README states this is because
"benchmarks are conditional on small actor-tier subsets". The larger
sample makes tier attribution genuinely learnable here, where it was
not in CYB003/CYB004.
Leakage audit
-------------
Three columns inspected for tier leakage:
- `attribution_risk_score` - mean 0.016-0.026 across tiers, ranges
overlap heavily. NOT an oracle; keep.
- `living_off_land_score` - mean 0.05 (lone) to 0.20 (nation_state),
with substantial overlap (std 0.08-0.25). Real observable, not
an oracle; keep.
- `attack_phase` - 89% purity vs `detection_outcome` (recovery_in_progress
is a 1:1 alias), but for TIER prediction it has no oracle relationship.
Keep.
No columns are dropped for tier prediction. The model is trained on what
a SOC analyst would actually see at observation time.
Public API
----------
build_features(timelines_path, topology_path)
-> (X, y, groups, meta)
transform_single(record, meta, segment_aggregates=None) -> np.ndarray
save_meta(meta, path) / load_meta(path)
build_segment_lookup(topology_path) -> dict
License
-------
Ships with the public model on Hugging Face under CC-BY-NC-4.0,
matching the dataset license. See README.md.
"""
from __future__ import annotations
import json
from pathlib import Path
from typing import Any
import numpy as np
import pandas as pd
# ---------------------------------------------------------------------------
# Label space
# ---------------------------------------------------------------------------
# Ordered roughly by capability: lone -> nation_state. Class imbalance:
# organised_syndicate (40%), raas_affiliate (30%), lone_actor (15%),
# nation_state_nexus (15%).
LABEL_ORDER = [
"lone_actor",
"organised_syndicate",
"raas_affiliate",
"nation_state_nexus",
]
LABEL_TO_INT = {lbl: i for i, lbl in enumerate(LABEL_ORDER)}
INT_TO_LABEL = {i: lbl for lbl, i in LABEL_TO_INT.items()}
# ---------------------------------------------------------------------------
# Identifier and target columns - not features
# ---------------------------------------------------------------------------
ID_COLUMNS = ["campaign_id", "actor_id"]
TARGET_COLUMN = "actor_capability_tier"
# No columns dropped for leakage. See module docstring's "Leakage audit"
# for the rationale on each candidate.
LEAKY_COLUMNS: list[str] = []
# ---------------------------------------------------------------------------
# Per-timestep numeric features
# ---------------------------------------------------------------------------
DIRECT_NUMERIC_TIMESTEP_FEATURES = [
"timestep", # position in 75-step lifecycle
"files_encrypted_cumulative",
"encryption_throughput_mbps",
"endpoints_compromised",
"lateral_move_count",
"credential_harvest_count",
"c2_bytes_exfiltrated",
"defender_alert_score",
"blast_radius_pct",
"living_off_land_score",
"attribution_risk_score",
"data_exfiltrated_gb",
"wiper_flag",
"double_extortion_flag",
"ir_activated",
]
# Per-timestep categoricals to one-hot
CATEGORICAL_TIMESTEP_FEATURES = [
"attack_phase", # 8 phases
"detection_outcome", # 5 outcomes incl. recovery_in_progress
]
# ---------------------------------------------------------------------------
# Victim topology features (joined on target_segment_id == segment_id)
# ---------------------------------------------------------------------------
# victim_topology.csv is segment-level (300 rows, one per segment). Each
# campaign targets one segment, so these become per-campaign-constant
# features. They provide useful conditioning context (what defender
# posture is the actor working against) without being tier oracles.
TOPOLOGY_NUMERIC_FEATURES = [
"edr_coverage_rate",
"network_segmentation_quality",
"patch_posture_score",
"ir_activation_latency_hrs",
"endpoint_count",
"ad_domain_complexity",
"soc_maturity_score",
"backup_recovery_prob",
"backup_recovery_hrs_mean",
"siem_rule_refresh_cadence_days",
]
TOPOLOGY_CATEGORICAL_FEATURES = [
"segment_type", # 8 values: corporate_lan / dmz / cloud_workload / ot_ics_control / ...
"soc_maturity_tier", # tier label
"backup_maturity_tier", # 6 values: no_backup / local_only / network_attached / ...
]
# ---------------------------------------------------------------------------
# Engineered features
# ---------------------------------------------------------------------------
def _add_engineered_features(df: pd.DataFrame) -> pd.DataFrame:
"""
Six engineered features encoding tier-discriminative hypotheses.
Each is a behavioural composite that a threat analyst would compute
by hand to distinguish actor sophistication levels.
"""
df = df.copy()
# 1. C2 intensity: data exfiltration combined with encryption throughput.
# Nation-state and organised tiers tend to sustain higher both;
# lone actors burst then quiet down.
df["c2_intensity_score"] = np.log1p(
df["c2_bytes_exfiltrated"].clip(lower=0)
* df["encryption_throughput_mbps"].clip(lower=0)
).astype(float)
# 2. Escalation velocity: lateral moves per timestep elapsed.
# Higher = aggressive (raas/syndicate). Lower = methodical (apt).
df["escalation_velocity"] = (
df["lateral_move_count"] / df["timestep"].clip(lower=1)
).astype(float)
# 3. Destructive intent: wiper or double_extortion deployed.
# Wiper is a strong nation_state signature.
df["is_destructive"] = (
(df["wiper_flag"] == 1) | (df["double_extortion_flag"] == 1)
).astype(int)
# 4. Dwell efficiency: blast radius per timestep. High = fast,
# low = patient. Helps separate organised_syndicate (fast) from
# nation_state_nexus (patient).
df["dwell_efficiency"] = (
df["blast_radius_pct"] / df["timestep"].clip(lower=1)
).astype(float)
# 5. Post-detonation indicator. Timesteps after 50 are typically
# encryption_detonation / ransom_negotiation / recovery phases,
# which surface tier signal through ransom posture.
df["is_post_detonation"] = (df["timestep"] > 50).astype(int)
# 6. LotL intensity bin. Quartile bins of living_off_land_score
# give the trees a categorical view of an otherwise continuous
# tier-correlated feature.
df["lotl_intensity_bin"] = pd.cut(
df["living_off_land_score"], bins=[-0.01, 0.1, 0.3, 0.6, 1.01],
labels=[0, 1, 2, 3],
).astype(int)
return df
# ---------------------------------------------------------------------------
# Public API
# ---------------------------------------------------------------------------
def build_features(
timelines_path: str | Path,
topology_path: str | Path,
) -> tuple[pd.DataFrame, pd.Series, pd.Series, dict[str, Any]]:
"""
Load CSVs, join topology, drop target + identifiers, engineer features,
one-hot encode, return (X, y, groups, meta).
`groups` is a Series of campaign_id values aligned with X. Use it with
GroupShuffleSplit / GroupKFold so train and test sets contain disjoint
campaigns - each campaign generates 75 highly-correlated timesteps.
"""
timelines = pd.read_csv(timelines_path)
topo = pd.read_csv(topology_path)
y = timelines[TARGET_COLUMN].map(LABEL_TO_INT)
if y.isna().any():
bad = timelines.loc[y.isna(), TARGET_COLUMN].unique()
raise ValueError(f"Unknown actor_capability_tier values: {bad}")
y = y.astype(int)
groups = timelines["campaign_id"].copy()
timelines = timelines.drop(
columns=ID_COLUMNS + [TARGET_COLUMN] + LEAKY_COLUMNS, errors="ignore",
)
# Join victim topology features on target_segment_id == segment_id
topo_cols_needed = (
["segment_id"] + TOPOLOGY_NUMERIC_FEATURES + TOPOLOGY_CATEGORICAL_FEATURES
)
timelines = timelines.merge(
topo[topo_cols_needed],
left_on="target_segment_id", right_on="segment_id", how="left",
).drop(columns=["segment_id"], errors="ignore")
# target_segment_id is high-cardinality (251 unique). Use it as an
# ordinal feature by hashing to integer rather than one-hot.
timelines["segment_id_hash"] = (
timelines["target_segment_id"].astype("category").cat.codes.astype(float)
)
timelines = timelines.drop(columns=["target_segment_id"])
timelines = _add_engineered_features(timelines)
numeric_features = (
DIRECT_NUMERIC_TIMESTEP_FEATURES
+ TOPOLOGY_NUMERIC_FEATURES
+ [
"segment_id_hash",
"c2_intensity_score", "escalation_velocity", "is_destructive",
"dwell_efficiency", "is_post_detonation", "lotl_intensity_bin",
]
)
X_numeric = timelines[numeric_features].astype(float)
all_categorical = (
[(col, "timestep") for col in CATEGORICAL_TIMESTEP_FEATURES]
+ [(col, "topology") for col in TOPOLOGY_CATEGORICAL_FEATURES]
)
categorical_levels: dict[str, list[str]] = {}
blocks: list[pd.DataFrame] = []
for col, _src in all_categorical:
if col not in timelines.columns:
continue
levels = sorted(timelines[col].dropna().unique().tolist())
categorical_levels[col] = levels
block = pd.get_dummies(
timelines[col].astype("category").cat.set_categories(levels),
prefix=col, dummy_na=False,
).astype(int)
blocks.append(block)
X = pd.concat(
[X_numeric.reset_index(drop=True)]
+ [b.reset_index(drop=True) for b in blocks],
axis=1,
).fillna(0.0)
meta = {
"feature_names": X.columns.tolist(),
"numeric_features": numeric_features,
"categorical_levels": categorical_levels,
"label_to_int": LABEL_TO_INT,
"int_to_label": INT_TO_LABEL,
"leakage_excluded": LEAKY_COLUMNS,
}
return X, y, groups, meta
def transform_single(
record: dict | pd.DataFrame,
meta: dict[str, Any],
segment_aggregates: dict | None = None,
) -> np.ndarray:
"""Encode a single timestep record for inference."""
if isinstance(record, dict):
df = pd.DataFrame([record.copy()])
else:
df = record.copy()
if segment_aggregates is not None:
for k, v in segment_aggregates.items():
df[k] = v
# If target_segment_id is present but segment_id_hash isn't, set 0 (unknown)
if "segment_id_hash" not in df.columns:
df["segment_id_hash"] = 0.0
if "target_segment_id" in df.columns:
df = df.drop(columns=["target_segment_id"])
df = _add_engineered_features(df)
numeric = pd.DataFrame({
col: df.get(col, pd.Series([0.0] * len(df))).astype(float).values
for col in meta["numeric_features"]
})
blocks: list[pd.DataFrame] = [numeric]
for col, levels in meta["categorical_levels"].items():
val = df.get(col, pd.Series([None] * len(df)))
block = pd.get_dummies(
val.astype("category").cat.set_categories(levels),
prefix=col, dummy_na=False,
).astype(int)
for lvl in levels:
cname = f"{col}_{lvl}"
if cname not in block.columns:
block[cname] = 0
block = block[[f"{col}_{lvl}" for lvl in levels]]
blocks.append(block)
X = pd.concat(blocks, axis=1).fillna(0.0)
X = X.reindex(columns=meta["feature_names"], fill_value=0.0)
return X.values.astype(np.float32)
def save_meta(meta: dict[str, Any], path: str | Path) -> None:
serializable = {
"feature_names": meta["feature_names"],
"numeric_features": meta["numeric_features"],
"categorical_levels": meta["categorical_levels"],
"label_to_int": meta["label_to_int"],
"int_to_label": {str(k): v for k, v in meta["int_to_label"].items()},
"leakage_excluded": meta.get("leakage_excluded", []),
}
with open(path, "w") as f:
json.dump(serializable, f, indent=2)
def load_meta(path: str | Path) -> dict[str, Any]:
with open(path) as f:
meta = json.load(f)
meta["int_to_label"] = {int(k): v for k, v in meta["int_to_label"].items()}
return meta
def build_segment_lookup(topology_path: str | Path) -> dict[str, dict]:
"""Build {segment_id: {topology feature values}} for inference-time lookup."""
topo = pd.read_csv(topology_path)
cols = TOPOLOGY_NUMERIC_FEATURES + TOPOLOGY_CATEGORICAL_FEATURES
out = {}
for _, row in topo.iterrows():
out[row["segment_id"]] = {c: row[c] for c in cols if c in topo.columns}
return out
if __name__ == "__main__":
import sys
base = Path(sys.argv[1]) if len(sys.argv) > 1 else Path("/mnt/user-data/uploads")
X, y, groups, meta = build_features(
base / "attack_timelines.csv",
base / "victim_topology.csv",
)
print(f"X shape: {X.shape}")
print(f"y shape: {y.shape}")
print(f"groups: {groups.nunique()} campaigns")
print(f"n features: {len(meta['feature_names'])}")
print(f"label distribution:\n{y.map(INT_TO_LABEL).value_counts()}")
print(f"X has NaN: {X.isnull().any().any()}")