""" feature_engineering.py ====================== Feature pipeline for the CYB005 baseline classifier. Predicts `actor_capability_tier` (4-class) from per-timestep ransomware campaign telemetry on the CYB005 sample dataset. CSV inputs: attack_timelines.csv (primary, one row per timestep, 500 campaigns x 75 timesteps = 37,489 rows) victim_topology.csv (per-segment defender configuration, joined on target_segment_id; one row per segment) campaign_summary.csv (per-campaign aggregates; reserved for future work - many fields are post-hoc outcomes that would leak the tier through training) campaign_events.csv (discrete event log; reserved for future work) Target classes (4): lone_actor, organised_syndicate, raas_affiliate, nation_state_nexus Sample size note ---------------- CYB005's sample is intentionally larger than its sister datasets (500 campaigns vs 100 in CYB002/3/4). The README states this is because "benchmarks are conditional on small actor-tier subsets". The larger sample makes tier attribution genuinely learnable here, where it was not in CYB003/CYB004. Leakage audit ------------- Three columns inspected for tier leakage: - `attribution_risk_score` - mean 0.016-0.026 across tiers, ranges overlap heavily. NOT an oracle; keep. - `living_off_land_score` - mean 0.05 (lone) to 0.20 (nation_state), with substantial overlap (std 0.08-0.25). Real observable, not an oracle; keep. - `attack_phase` - 89% purity vs `detection_outcome` (recovery_in_progress is a 1:1 alias), but for TIER prediction it has no oracle relationship. Keep. No columns are dropped for tier prediction. The model is trained on what a SOC analyst would actually see at observation time. Public API ---------- build_features(timelines_path, topology_path) -> (X, y, groups, meta) transform_single(record, meta, segment_aggregates=None) -> np.ndarray save_meta(meta, path) / load_meta(path) build_segment_lookup(topology_path) -> dict License ------- Ships with the public model on Hugging Face under CC-BY-NC-4.0, matching the dataset license. See README.md. """ from __future__ import annotations import json from pathlib import Path from typing import Any import numpy as np import pandas as pd # --------------------------------------------------------------------------- # Label space # --------------------------------------------------------------------------- # Ordered roughly by capability: lone -> nation_state. Class imbalance: # organised_syndicate (40%), raas_affiliate (30%), lone_actor (15%), # nation_state_nexus (15%). LABEL_ORDER = [ "lone_actor", "organised_syndicate", "raas_affiliate", "nation_state_nexus", ] LABEL_TO_INT = {lbl: i for i, lbl in enumerate(LABEL_ORDER)} INT_TO_LABEL = {i: lbl for lbl, i in LABEL_TO_INT.items()} # --------------------------------------------------------------------------- # Identifier and target columns - not features # --------------------------------------------------------------------------- ID_COLUMNS = ["campaign_id", "actor_id"] TARGET_COLUMN = "actor_capability_tier" # No columns dropped for leakage. See module docstring's "Leakage audit" # for the rationale on each candidate. LEAKY_COLUMNS: list[str] = [] # --------------------------------------------------------------------------- # Per-timestep numeric features # --------------------------------------------------------------------------- DIRECT_NUMERIC_TIMESTEP_FEATURES = [ "timestep", # position in 75-step lifecycle "files_encrypted_cumulative", "encryption_throughput_mbps", "endpoints_compromised", "lateral_move_count", "credential_harvest_count", "c2_bytes_exfiltrated", "defender_alert_score", "blast_radius_pct", "living_off_land_score", "attribution_risk_score", "data_exfiltrated_gb", "wiper_flag", "double_extortion_flag", "ir_activated", ] # Per-timestep categoricals to one-hot CATEGORICAL_TIMESTEP_FEATURES = [ "attack_phase", # 8 phases "detection_outcome", # 5 outcomes incl. recovery_in_progress ] # --------------------------------------------------------------------------- # Victim topology features (joined on target_segment_id == segment_id) # --------------------------------------------------------------------------- # victim_topology.csv is segment-level (300 rows, one per segment). Each # campaign targets one segment, so these become per-campaign-constant # features. They provide useful conditioning context (what defender # posture is the actor working against) without being tier oracles. TOPOLOGY_NUMERIC_FEATURES = [ "edr_coverage_rate", "network_segmentation_quality", "patch_posture_score", "ir_activation_latency_hrs", "endpoint_count", "ad_domain_complexity", "soc_maturity_score", "backup_recovery_prob", "backup_recovery_hrs_mean", "siem_rule_refresh_cadence_days", ] TOPOLOGY_CATEGORICAL_FEATURES = [ "segment_type", # 8 values: corporate_lan / dmz / cloud_workload / ot_ics_control / ... "soc_maturity_tier", # tier label "backup_maturity_tier", # 6 values: no_backup / local_only / network_attached / ... ] # --------------------------------------------------------------------------- # Engineered features # --------------------------------------------------------------------------- def _add_engineered_features(df: pd.DataFrame) -> pd.DataFrame: """ Six engineered features encoding tier-discriminative hypotheses. Each is a behavioural composite that a threat analyst would compute by hand to distinguish actor sophistication levels. """ df = df.copy() # 1. C2 intensity: data exfiltration combined with encryption throughput. # Nation-state and organised tiers tend to sustain higher both; # lone actors burst then quiet down. df["c2_intensity_score"] = np.log1p( df["c2_bytes_exfiltrated"].clip(lower=0) * df["encryption_throughput_mbps"].clip(lower=0) ).astype(float) # 2. Escalation velocity: lateral moves per timestep elapsed. # Higher = aggressive (raas/syndicate). Lower = methodical (apt). df["escalation_velocity"] = ( df["lateral_move_count"] / df["timestep"].clip(lower=1) ).astype(float) # 3. Destructive intent: wiper or double_extortion deployed. # Wiper is a strong nation_state signature. df["is_destructive"] = ( (df["wiper_flag"] == 1) | (df["double_extortion_flag"] == 1) ).astype(int) # 4. Dwell efficiency: blast radius per timestep. High = fast, # low = patient. Helps separate organised_syndicate (fast) from # nation_state_nexus (patient). df["dwell_efficiency"] = ( df["blast_radius_pct"] / df["timestep"].clip(lower=1) ).astype(float) # 5. Post-detonation indicator. Timesteps after 50 are typically # encryption_detonation / ransom_negotiation / recovery phases, # which surface tier signal through ransom posture. df["is_post_detonation"] = (df["timestep"] > 50).astype(int) # 6. LotL intensity bin. Quartile bins of living_off_land_score # give the trees a categorical view of an otherwise continuous # tier-correlated feature. df["lotl_intensity_bin"] = pd.cut( df["living_off_land_score"], bins=[-0.01, 0.1, 0.3, 0.6, 1.01], labels=[0, 1, 2, 3], ).astype(int) return df # --------------------------------------------------------------------------- # Public API # --------------------------------------------------------------------------- def build_features( timelines_path: str | Path, topology_path: str | Path, ) -> tuple[pd.DataFrame, pd.Series, pd.Series, dict[str, Any]]: """ Load CSVs, join topology, drop target + identifiers, engineer features, one-hot encode, return (X, y, groups, meta). `groups` is a Series of campaign_id values aligned with X. Use it with GroupShuffleSplit / GroupKFold so train and test sets contain disjoint campaigns - each campaign generates 75 highly-correlated timesteps. """ timelines = pd.read_csv(timelines_path) topo = pd.read_csv(topology_path) y = timelines[TARGET_COLUMN].map(LABEL_TO_INT) if y.isna().any(): bad = timelines.loc[y.isna(), TARGET_COLUMN].unique() raise ValueError(f"Unknown actor_capability_tier values: {bad}") y = y.astype(int) groups = timelines["campaign_id"].copy() timelines = timelines.drop( columns=ID_COLUMNS + [TARGET_COLUMN] + LEAKY_COLUMNS, errors="ignore", ) # Join victim topology features on target_segment_id == segment_id topo_cols_needed = ( ["segment_id"] + TOPOLOGY_NUMERIC_FEATURES + TOPOLOGY_CATEGORICAL_FEATURES ) timelines = timelines.merge( topo[topo_cols_needed], left_on="target_segment_id", right_on="segment_id", how="left", ).drop(columns=["segment_id"], errors="ignore") # target_segment_id is high-cardinality (251 unique). Use it as an # ordinal feature by hashing to integer rather than one-hot. timelines["segment_id_hash"] = ( timelines["target_segment_id"].astype("category").cat.codes.astype(float) ) timelines = timelines.drop(columns=["target_segment_id"]) timelines = _add_engineered_features(timelines) numeric_features = ( DIRECT_NUMERIC_TIMESTEP_FEATURES + TOPOLOGY_NUMERIC_FEATURES + [ "segment_id_hash", "c2_intensity_score", "escalation_velocity", "is_destructive", "dwell_efficiency", "is_post_detonation", "lotl_intensity_bin", ] ) X_numeric = timelines[numeric_features].astype(float) all_categorical = ( [(col, "timestep") for col in CATEGORICAL_TIMESTEP_FEATURES] + [(col, "topology") for col in TOPOLOGY_CATEGORICAL_FEATURES] ) categorical_levels: dict[str, list[str]] = {} blocks: list[pd.DataFrame] = [] for col, _src in all_categorical: if col not in timelines.columns: continue levels = sorted(timelines[col].dropna().unique().tolist()) categorical_levels[col] = levels block = pd.get_dummies( timelines[col].astype("category").cat.set_categories(levels), prefix=col, dummy_na=False, ).astype(int) blocks.append(block) X = pd.concat( [X_numeric.reset_index(drop=True)] + [b.reset_index(drop=True) for b in blocks], axis=1, ).fillna(0.0) meta = { "feature_names": X.columns.tolist(), "numeric_features": numeric_features, "categorical_levels": categorical_levels, "label_to_int": LABEL_TO_INT, "int_to_label": INT_TO_LABEL, "leakage_excluded": LEAKY_COLUMNS, } return X, y, groups, meta def transform_single( record: dict | pd.DataFrame, meta: dict[str, Any], segment_aggregates: dict | None = None, ) -> np.ndarray: """Encode a single timestep record for inference.""" if isinstance(record, dict): df = pd.DataFrame([record.copy()]) else: df = record.copy() if segment_aggregates is not None: for k, v in segment_aggregates.items(): df[k] = v # If target_segment_id is present but segment_id_hash isn't, set 0 (unknown) if "segment_id_hash" not in df.columns: df["segment_id_hash"] = 0.0 if "target_segment_id" in df.columns: df = df.drop(columns=["target_segment_id"]) df = _add_engineered_features(df) numeric = pd.DataFrame({ col: df.get(col, pd.Series([0.0] * len(df))).astype(float).values for col in meta["numeric_features"] }) blocks: list[pd.DataFrame] = [numeric] for col, levels in meta["categorical_levels"].items(): val = df.get(col, pd.Series([None] * len(df))) block = pd.get_dummies( val.astype("category").cat.set_categories(levels), prefix=col, dummy_na=False, ).astype(int) for lvl in levels: cname = f"{col}_{lvl}" if cname not in block.columns: block[cname] = 0 block = block[[f"{col}_{lvl}" for lvl in levels]] blocks.append(block) X = pd.concat(blocks, axis=1).fillna(0.0) X = X.reindex(columns=meta["feature_names"], fill_value=0.0) return X.values.astype(np.float32) def save_meta(meta: dict[str, Any], path: str | Path) -> None: serializable = { "feature_names": meta["feature_names"], "numeric_features": meta["numeric_features"], "categorical_levels": meta["categorical_levels"], "label_to_int": meta["label_to_int"], "int_to_label": {str(k): v for k, v in meta["int_to_label"].items()}, "leakage_excluded": meta.get("leakage_excluded", []), } with open(path, "w") as f: json.dump(serializable, f, indent=2) def load_meta(path: str | Path) -> dict[str, Any]: with open(path) as f: meta = json.load(f) meta["int_to_label"] = {int(k): v for k, v in meta["int_to_label"].items()} return meta def build_segment_lookup(topology_path: str | Path) -> dict[str, dict]: """Build {segment_id: {topology feature values}} for inference-time lookup.""" topo = pd.read_csv(topology_path) cols = TOPOLOGY_NUMERIC_FEATURES + TOPOLOGY_CATEGORICAL_FEATURES out = {} for _, row in topo.iterrows(): out[row["segment_id"]] = {c: row[c] for c in cols if c in topo.columns} return out if __name__ == "__main__": import sys base = Path(sys.argv[1]) if len(sys.argv) > 1 else Path("/mnt/user-data/uploads") X, y, groups, meta = build_features( base / "attack_timelines.csv", base / "victim_topology.csv", ) print(f"X shape: {X.shape}") print(f"y shape: {y.shape}") print(f"groups: {groups.nunique()} campaigns") print(f"n features: {len(meta['feature_names'])}") print(f"label distribution:\n{y.map(INT_TO_LABEL).value_counts()}") print(f"X has NaN: {X.isnull().any().any()}")