""" feature_engineering.py ====================== Feature pipeline for the CYB010 baseline classifier. Predicts `attack_lifecycle_phase` (5-class attack phase) from per-event features on the CYB010 sample dataset. CSV inputs: security_events.csv (primary, one row per event, 21,896 events) host_inventory.csv (per-host registry, joined for host context) alert_records.csv (per-alert records; reserved) incident_summary.csv (per-incident summaries; reserved) Target classes (5): benign_background, initial_access, lateral_movement, persistence_establishment, exfiltration_or_impact Why this task ------------- The CYB010 README's central concept is the "5-phase attack lifecycle state machine", and `attack_lifecycle_phase` is the data's headline target. We piloted six candidate targets and found it gives the strongest honest result on the sample (acc 0.95, macro-F1 0.78, ROC-AUC 0.99 with group-aware split on incident_id). The other README-suggested targets either have unrecoverable structural leakage or are weaker after honest leak removal: - `threat_actor_profile` 5-class works (acc 0.84) but is benign-driven - 4-class malicious-only collapses to acc 0.57 vs majority 0.61. - `label_true_positive` on alerts has 9 oracle features; after dropping all of them, honest acc 0.80, AUC 0.89 (documented as a secondary finding in leakage_diagnostic.json). - `mitre_tactic` 14-class hits 0.90 acc but macro-F1 0.37 - imbalance gaming (benign class dominates at 57%). - `event_class` 12-class is unlearnable (acc 0.35 vs majority 0.42). Group structure --------------- 500 incidents x ~44 events each. The per-event task has clear group structure: events from the same incident share host, threat actor, and phase trajectory. Group-aware split by `incident_id` is required to prevent train/test contamination. With 500 incidents, ~75 test incidents per fold gives reasonable estimation precision. Leakage audit ------------- Four columns dropped from features because they're structural oracles for the target: 1. `mitre_tactic`: when == "benign", deterministically pins attack_lifecycle_phase == "benign_background" (12,448 cases - all benign events). 2. `mitre_technique_id`: perfect oracle for `mitre_tactic` by ATT&CK design (54 techniques, each maps to exactly one tactic). Dropped because it indirectly encodes the benign vs malicious distinction. 3. `label_malicious`: when False, perfect oracle for benign_background phase. 4. `threat_actor_id`: when == "NONE", perfect oracle for benign profile/phase. The non-"NONE" actor IDs are 10 distinct labels that would also leak actor profile information indirectly. 5. `threat_actor_profile`: contains "benign_user" which trivially identifies benign_background phase. 6. `event_type`: many event types are phase-specific (`c2_beacon_outbound` -> 99% exfiltration_or_impact). Dropped to avoid this near-oracle path. KEPT features that are informative but NOT oracles: - `event_class` (12 values): max purity 0.87, mean 0.72 - real signal with substantial overlap. C2 beacons (network_flow class) hit 65% exfil phase but also 29% benign. Strong feature, kept. - `severity_level`, `cvss_score_analogue`: per-event severity is a real observable, correlates with phase, has overlap. - `label_log_tampered`: real observable (APTs tamper more), correlates with malicious phases but not deterministic. - `log_source_type`, `siem_platform`: not phase-deterministic. - All host context features. Public API ---------- build_features(events_path, hosts_path) -> (X, y, ids, groups, meta) transform_single(record, meta, host_lookup=None) -> np.ndarray save_meta(meta, path) / load_meta(path) build_host_lookup(hosts_path) -> dict License ------- Ships with the public model on Hugging Face under CC-BY-NC-4.0, matching the dataset license. See README.md. """ from __future__ import annotations import json from pathlib import Path from typing import Any import numpy as np import pandas as pd # --------------------------------------------------------------------------- # Label space # --------------------------------------------------------------------------- # Ordered by attack progression. LABEL_ORDER = [ "benign_background", "initial_access", "lateral_movement", "persistence_establishment", "exfiltration_or_impact", ] LABEL_TO_INT = {lbl: i for i, lbl in enumerate(LABEL_ORDER)} INT_TO_LABEL = {i: lbl for lbl, i in LABEL_TO_INT.items()} # --------------------------------------------------------------------------- # Identifier and target columns # --------------------------------------------------------------------------- ID_COLUMNS = [ "event_id", "host_id", "incident_id", "timestamp", "user_id", "source_ip", "dest_ip", "raw_log_payload", ] TARGET_COLUMN = "attack_lifecycle_phase" GROUP_COLUMN = "incident_id" # Oracle columns dropped from features. ORACLE_COLUMNS = [ "mitre_tactic", # benign value -> benign_background phase "mitre_technique_id", # ATT&CK technique -> tactic deterministic "label_malicious", # False -> benign_background "threat_actor_id", # NONE -> benign "threat_actor_profile", # benign_user -> benign_background "event_type", # many event types phase-specific (e.g. c2_beacon_outbound) ] # --------------------------------------------------------------------------- # Per-event numeric features # --------------------------------------------------------------------------- EVENT_NUMERIC_FEATURES = [ "source_port", "dest_port", "cvss_score_analogue", "label_log_tampered", # bool kept as observable "label_false_positive", # bool kept as observable (all False on events) ] EVENT_CATEGORICAL_FEATURES = [ "event_class", # 12 values "log_source_type", # 8 values "severity_level", # 5 values ] # --------------------------------------------------------------------------- # Host features (joined on host_id from host_inventory.csv) # --------------------------------------------------------------------------- HOST_NUMERIC_FEATURES = [ "edr_agent_installed", "patch_compliance_level", "vulnerability_count_open", ] HOST_CATEGORICAL_FEATURES = [ "os_type", # 7 values "host_role", # 10 values "network_segment", # 8 values "defender_posture_tier", # 4 values "criticality_rating", # 4 values "cloud_provider", # 4 values "siem_platform", # 8 values ] # --------------------------------------------------------------------------- # Engineered features # --------------------------------------------------------------------------- def _add_engineered_features(df: pd.DataFrame) -> pd.DataFrame: """ Six engineered features encoding phase-discriminative hypotheses. Each composite is something a SOC analyst would compute by hand. """ df = df.copy() # 1. Hour of day (0-23) from timestamp, if available if "timestamp" in df.columns: ts = pd.to_datetime(df["timestamp"], errors="coerce") df["hour_of_day"] = ts.dt.hour.fillna(12).astype(int) df["is_off_hours"] = ((ts.dt.hour < 9) | (ts.dt.hour > 17)).fillna(False).astype(int) df["is_weekend"] = (ts.dt.weekday >= 5).fillna(False).astype(int) else: df["hour_of_day"] = 12 df["is_off_hours"] = 0 df["is_weekend"] = 0 # 2. Log-scaled CVSS (heavy-tailed) df["log_cvss"] = np.log1p( df.get("cvss_score_analogue", 0).clip(lower=0) ).astype(float) # 3. High-CVSS indicator df["is_high_cvss"] = ( df.get("cvss_score_analogue", 0) >= 7.0 ).astype(int) # 4. Port category: well-known (<1024) vs registered vs dynamic dest = df.get("dest_port", 0).fillna(0).astype(int) df["is_well_known_port"] = (dest < 1024).astype(int) df["is_dynamic_port"] = (dest >= 49152).astype(int) # 5. Network direction: same-network if source_port equals dest_port # OR if specific dest_port matches common service. Rough proxy. df["is_outbound_web"] = (dest.isin([80, 443, 8080, 8443])).astype(int) # 6. Risk composite: CVSS x defender_weakness. Higher composite -> later phase. if "patch_compliance_level" in df.columns: df["risk_composite"] = ( df["cvss_score_analogue"].fillna(0) * (1 - df["patch_compliance_level"].fillna(1)) ).astype(float) else: df["risk_composite"] = 0.0 return df # --------------------------------------------------------------------------- # Public API # --------------------------------------------------------------------------- def build_features( events_path: str | Path, hosts_path: str | Path, ) -> tuple[pd.DataFrame, pd.Series, pd.Series, pd.Series, dict[str, Any]]: """ Load security_events.csv, join host_inventory.csv, drop target + identifiers + oracle columns, engineer features, one-hot encode, return (X, y, ids, groups, meta). """ events = pd.read_csv(events_path) hosts = pd.read_csv(hosts_path) y = events[TARGET_COLUMN].map(LABEL_TO_INT) if y.isna().any(): bad = events.loc[y.isna(), TARGET_COLUMN].unique() raise ValueError(f"Unknown attack_lifecycle_phase values: {bad}") y = y.astype(int) ids = events["event_id"].copy() groups = events[GROUP_COLUMN].copy() host_cols_needed = ( ["host_id"] + HOST_NUMERIC_FEATURES + HOST_CATEGORICAL_FEATURES ) events = events.merge( hosts[host_cols_needed], on="host_id", how="left", ) # Apply engineered features BEFORE dropping timestamp events = _add_engineered_features(events) events = events.drop( columns=ID_COLUMNS + [TARGET_COLUMN] + ORACLE_COLUMNS, errors="ignore", ) numeric_features = ( EVENT_NUMERIC_FEATURES + HOST_NUMERIC_FEATURES + [ "hour_of_day", "is_off_hours", "is_weekend", "log_cvss", "is_high_cvss", "is_well_known_port", "is_dynamic_port", "is_outbound_web", "risk_composite", ] ) numeric_features = [c for c in numeric_features if c in events.columns] X_numeric = events[numeric_features].apply( lambda s: s.astype(float) if s.dtype != bool else s.astype(int).astype(float) ) all_categorical = EVENT_CATEGORICAL_FEATURES + HOST_CATEGORICAL_FEATURES categorical_levels: dict[str, list[str]] = {} blocks: list[pd.DataFrame] = [] for col in all_categorical: if col not in events.columns: continue levels = sorted(events[col].dropna().astype(str).unique().tolist()) categorical_levels[col] = levels block = pd.get_dummies( events[col].astype(str).astype("category").cat.set_categories(levels), prefix=col, dummy_na=False, ).astype(int) blocks.append(block) X = pd.concat( [X_numeric.reset_index(drop=True)] + [b.reset_index(drop=True) for b in blocks], axis=1, ).fillna(0.0) meta = { "feature_names": X.columns.tolist(), "numeric_features": numeric_features, "categorical_levels": categorical_levels, "label_to_int": LABEL_TO_INT, "int_to_label": INT_TO_LABEL, "oracle_excluded": ORACLE_COLUMNS, } return X, y, ids, groups, meta def transform_single( record: dict | pd.DataFrame, meta: dict[str, Any], host_lookup: dict | None = None, ) -> np.ndarray: """Encode a single event record for inference.""" if isinstance(record, dict): df = pd.DataFrame([record.copy()]) else: df = record.copy() if host_lookup is not None and "host_id" in df.columns: host_id = df["host_id"].iloc[0] host_feats = host_lookup.get(host_id, {}) for k, v in host_feats.items(): if k not in df.columns: df[k] = v df = _add_engineered_features(df) numeric = pd.DataFrame() for col in meta["numeric_features"]: s = df.get(col, pd.Series([0.0] * len(df))) if s.dtype == bool: s = s.astype(int) numeric[col] = s.astype(float).values blocks: list[pd.DataFrame] = [numeric] for col, levels in meta["categorical_levels"].items(): val = df.get(col, pd.Series([None] * len(df))).astype(str) block = pd.get_dummies( val.astype("category").cat.set_categories(levels), prefix=col, dummy_na=False, ).astype(int) for lvl in levels: cname = f"{col}_{lvl}" if cname not in block.columns: block[cname] = 0 block = block[[f"{col}_{lvl}" for lvl in levels]] blocks.append(block) X = pd.concat(blocks, axis=1).fillna(0.0) X = X.reindex(columns=meta["feature_names"], fill_value=0.0) return X.values.astype(np.float32) def save_meta(meta: dict[str, Any], path: str | Path) -> None: serializable = { "feature_names": meta["feature_names"], "numeric_features": meta["numeric_features"], "categorical_levels": meta["categorical_levels"], "label_to_int": meta["label_to_int"], "int_to_label": {str(k): v for k, v in meta["int_to_label"].items()}, "oracle_excluded": meta.get("oracle_excluded", []), } with open(path, "w") as f: json.dump(serializable, f, indent=2) def load_meta(path: str | Path) -> dict[str, Any]: with open(path) as f: meta = json.load(f) meta["int_to_label"] = {int(k): v for k, v in meta["int_to_label"].items()} return meta def build_host_lookup(hosts_path: str | Path) -> dict[str, dict]: """Build {host_id: {host feature values}} for inference-time lookup.""" hosts = pd.read_csv(hosts_path) cols = HOST_NUMERIC_FEATURES + HOST_CATEGORICAL_FEATURES out = {} for _, row in hosts.iterrows(): out[row["host_id"]] = {c: row[c] for c in cols if c in hosts.columns} return out if __name__ == "__main__": import sys base = Path(sys.argv[1]) if len(sys.argv) > 1 else Path("/mnt/user-data/uploads") X, y, ids, groups, meta = build_features( base / "security_events.csv", base / "host_inventory.csv", ) print(f"X shape: {X.shape}") print(f"y shape: {y.shape}") print(f"groups: {groups.nunique()} unique incidents") print(f"n_features: {len(meta['feature_names'])}") print(f"label distribution:\n{y.map(INT_TO_LABEL).value_counts()}") print(f"X has NaN: {X.isnull().any().any()}")