cyb010-baseline-classifier / feature_engineering.py
pradeep-xpert's picture
Initial release: attack_lifecycle_phase 5-class baseline + 11-oracle-path leakage diagnostic
e2c4702 verified
"""
feature_engineering.py
======================
Feature pipeline for the CYB010 baseline classifier.
Predicts `attack_lifecycle_phase` (5-class attack phase) from per-event
features on the CYB010 sample dataset.
CSV inputs:
security_events.csv (primary, one row per event, 21,896 events)
host_inventory.csv (per-host registry, joined for host context)
alert_records.csv (per-alert records; reserved)
incident_summary.csv (per-incident summaries; reserved)
Target classes (5):
benign_background, initial_access, lateral_movement,
persistence_establishment, exfiltration_or_impact
Why this task
-------------
The CYB010 README's central concept is the "5-phase attack lifecycle
state machine", and `attack_lifecycle_phase` is the data's headline
target. We piloted six candidate targets and found it gives the
strongest honest result on the sample (acc 0.95, macro-F1 0.78,
ROC-AUC 0.99 with group-aware split on incident_id).
The other README-suggested targets either have unrecoverable structural
leakage or are weaker after honest leak removal:
- `threat_actor_profile` 5-class works (acc 0.84) but is benign-driven
- 4-class malicious-only collapses to acc 0.57 vs majority 0.61.
- `label_true_positive` on alerts has 9 oracle features; after dropping
all of them, honest acc 0.80, AUC 0.89 (documented as a secondary
finding in leakage_diagnostic.json).
- `mitre_tactic` 14-class hits 0.90 acc but macro-F1 0.37 - imbalance
gaming (benign class dominates at 57%).
- `event_class` 12-class is unlearnable (acc 0.35 vs majority 0.42).
Group structure
---------------
500 incidents x ~44 events each. The per-event task has clear group
structure: events from the same incident share host, threat actor, and
phase trajectory. Group-aware split by `incident_id` is required to
prevent train/test contamination. With 500 incidents, ~75 test
incidents per fold gives reasonable estimation precision.
Leakage audit
-------------
Four columns dropped from features because they're structural oracles
for the target:
1. `mitre_tactic`: when == "benign", deterministically pins
attack_lifecycle_phase == "benign_background" (12,448 cases - all
benign events).
2. `mitre_technique_id`: perfect oracle for `mitre_tactic` by ATT&CK
design (54 techniques, each maps to exactly one tactic). Dropped
because it indirectly encodes the benign vs malicious distinction.
3. `label_malicious`: when False, perfect oracle for
benign_background phase.
4. `threat_actor_id`: when == "NONE", perfect oracle for benign
profile/phase. The non-"NONE" actor IDs are 10 distinct labels
that would also leak actor profile information indirectly.
5. `threat_actor_profile`: contains "benign_user" which trivially
identifies benign_background phase.
6. `event_type`: many event types are phase-specific
(`c2_beacon_outbound` -> 99% exfiltration_or_impact). Dropped to
avoid this near-oracle path.
KEPT features that are informative but NOT oracles:
- `event_class` (12 values): max purity 0.87, mean 0.72 - real signal
with substantial overlap. C2 beacons (network_flow class) hit 65%
exfil phase but also 29% benign. Strong feature, kept.
- `severity_level`, `cvss_score_analogue`: per-event severity is a
real observable, correlates with phase, has overlap.
- `label_log_tampered`: real observable (APTs tamper more), correlates
with malicious phases but not deterministic.
- `log_source_type`, `siem_platform`: not phase-deterministic.
- All host context features.
Public API
----------
build_features(events_path, hosts_path) -> (X, y, ids, groups, meta)
transform_single(record, meta, host_lookup=None) -> np.ndarray
save_meta(meta, path) / load_meta(path)
build_host_lookup(hosts_path) -> dict
License
-------
Ships with the public model on Hugging Face under CC-BY-NC-4.0,
matching the dataset license. See README.md.
"""
from __future__ import annotations
import json
from pathlib import Path
from typing import Any
import numpy as np
import pandas as pd
# ---------------------------------------------------------------------------
# Label space
# ---------------------------------------------------------------------------
# Ordered by attack progression.
LABEL_ORDER = [
"benign_background",
"initial_access",
"lateral_movement",
"persistence_establishment",
"exfiltration_or_impact",
]
LABEL_TO_INT = {lbl: i for i, lbl in enumerate(LABEL_ORDER)}
INT_TO_LABEL = {i: lbl for lbl, i in LABEL_TO_INT.items()}
# ---------------------------------------------------------------------------
# Identifier and target columns
# ---------------------------------------------------------------------------
ID_COLUMNS = [
"event_id", "host_id", "incident_id", "timestamp", "user_id",
"source_ip", "dest_ip", "raw_log_payload",
]
TARGET_COLUMN = "attack_lifecycle_phase"
GROUP_COLUMN = "incident_id"
# Oracle columns dropped from features.
ORACLE_COLUMNS = [
"mitre_tactic", # benign value -> benign_background phase
"mitre_technique_id", # ATT&CK technique -> tactic deterministic
"label_malicious", # False -> benign_background
"threat_actor_id", # NONE -> benign
"threat_actor_profile", # benign_user -> benign_background
"event_type", # many event types phase-specific (e.g. c2_beacon_outbound)
]
# ---------------------------------------------------------------------------
# Per-event numeric features
# ---------------------------------------------------------------------------
EVENT_NUMERIC_FEATURES = [
"source_port",
"dest_port",
"cvss_score_analogue",
"label_log_tampered", # bool kept as observable
"label_false_positive", # bool kept as observable (all False on events)
]
EVENT_CATEGORICAL_FEATURES = [
"event_class", # 12 values
"log_source_type", # 8 values
"severity_level", # 5 values
]
# ---------------------------------------------------------------------------
# Host features (joined on host_id from host_inventory.csv)
# ---------------------------------------------------------------------------
HOST_NUMERIC_FEATURES = [
"edr_agent_installed",
"patch_compliance_level",
"vulnerability_count_open",
]
HOST_CATEGORICAL_FEATURES = [
"os_type", # 7 values
"host_role", # 10 values
"network_segment", # 8 values
"defender_posture_tier", # 4 values
"criticality_rating", # 4 values
"cloud_provider", # 4 values
"siem_platform", # 8 values
]
# ---------------------------------------------------------------------------
# Engineered features
# ---------------------------------------------------------------------------
def _add_engineered_features(df: pd.DataFrame) -> pd.DataFrame:
"""
Six engineered features encoding phase-discriminative hypotheses.
Each composite is something a SOC analyst would compute by hand.
"""
df = df.copy()
# 1. Hour of day (0-23) from timestamp, if available
if "timestamp" in df.columns:
ts = pd.to_datetime(df["timestamp"], errors="coerce")
df["hour_of_day"] = ts.dt.hour.fillna(12).astype(int)
df["is_off_hours"] = ((ts.dt.hour < 9) | (ts.dt.hour > 17)).fillna(False).astype(int)
df["is_weekend"] = (ts.dt.weekday >= 5).fillna(False).astype(int)
else:
df["hour_of_day"] = 12
df["is_off_hours"] = 0
df["is_weekend"] = 0
# 2. Log-scaled CVSS (heavy-tailed)
df["log_cvss"] = np.log1p(
df.get("cvss_score_analogue", 0).clip(lower=0)
).astype(float)
# 3. High-CVSS indicator
df["is_high_cvss"] = (
df.get("cvss_score_analogue", 0) >= 7.0
).astype(int)
# 4. Port category: well-known (<1024) vs registered vs dynamic
dest = df.get("dest_port", 0).fillna(0).astype(int)
df["is_well_known_port"] = (dest < 1024).astype(int)
df["is_dynamic_port"] = (dest >= 49152).astype(int)
# 5. Network direction: same-network if source_port equals dest_port
# OR if specific dest_port matches common service. Rough proxy.
df["is_outbound_web"] = (dest.isin([80, 443, 8080, 8443])).astype(int)
# 6. Risk composite: CVSS x defender_weakness. Higher composite -> later phase.
if "patch_compliance_level" in df.columns:
df["risk_composite"] = (
df["cvss_score_analogue"].fillna(0) *
(1 - df["patch_compliance_level"].fillna(1))
).astype(float)
else:
df["risk_composite"] = 0.0
return df
# ---------------------------------------------------------------------------
# Public API
# ---------------------------------------------------------------------------
def build_features(
events_path: str | Path,
hosts_path: str | Path,
) -> tuple[pd.DataFrame, pd.Series, pd.Series, pd.Series, dict[str, Any]]:
"""
Load security_events.csv, join host_inventory.csv, drop target +
identifiers + oracle columns, engineer features, one-hot encode,
return (X, y, ids, groups, meta).
"""
events = pd.read_csv(events_path)
hosts = pd.read_csv(hosts_path)
y = events[TARGET_COLUMN].map(LABEL_TO_INT)
if y.isna().any():
bad = events.loc[y.isna(), TARGET_COLUMN].unique()
raise ValueError(f"Unknown attack_lifecycle_phase values: {bad}")
y = y.astype(int)
ids = events["event_id"].copy()
groups = events[GROUP_COLUMN].copy()
host_cols_needed = (
["host_id"] + HOST_NUMERIC_FEATURES + HOST_CATEGORICAL_FEATURES
)
events = events.merge(
hosts[host_cols_needed], on="host_id", how="left",
)
# Apply engineered features BEFORE dropping timestamp
events = _add_engineered_features(events)
events = events.drop(
columns=ID_COLUMNS + [TARGET_COLUMN] + ORACLE_COLUMNS,
errors="ignore",
)
numeric_features = (
EVENT_NUMERIC_FEATURES
+ HOST_NUMERIC_FEATURES
+ [
"hour_of_day", "is_off_hours", "is_weekend",
"log_cvss", "is_high_cvss",
"is_well_known_port", "is_dynamic_port", "is_outbound_web",
"risk_composite",
]
)
numeric_features = [c for c in numeric_features if c in events.columns]
X_numeric = events[numeric_features].apply(
lambda s: s.astype(float) if s.dtype != bool else s.astype(int).astype(float)
)
all_categorical = EVENT_CATEGORICAL_FEATURES + HOST_CATEGORICAL_FEATURES
categorical_levels: dict[str, list[str]] = {}
blocks: list[pd.DataFrame] = []
for col in all_categorical:
if col not in events.columns:
continue
levels = sorted(events[col].dropna().astype(str).unique().tolist())
categorical_levels[col] = levels
block = pd.get_dummies(
events[col].astype(str).astype("category").cat.set_categories(levels),
prefix=col, dummy_na=False,
).astype(int)
blocks.append(block)
X = pd.concat(
[X_numeric.reset_index(drop=True)]
+ [b.reset_index(drop=True) for b in blocks],
axis=1,
).fillna(0.0)
meta = {
"feature_names": X.columns.tolist(),
"numeric_features": numeric_features,
"categorical_levels": categorical_levels,
"label_to_int": LABEL_TO_INT,
"int_to_label": INT_TO_LABEL,
"oracle_excluded": ORACLE_COLUMNS,
}
return X, y, ids, groups, meta
def transform_single(
record: dict | pd.DataFrame,
meta: dict[str, Any],
host_lookup: dict | None = None,
) -> np.ndarray:
"""Encode a single event record for inference."""
if isinstance(record, dict):
df = pd.DataFrame([record.copy()])
else:
df = record.copy()
if host_lookup is not None and "host_id" in df.columns:
host_id = df["host_id"].iloc[0]
host_feats = host_lookup.get(host_id, {})
for k, v in host_feats.items():
if k not in df.columns:
df[k] = v
df = _add_engineered_features(df)
numeric = pd.DataFrame()
for col in meta["numeric_features"]:
s = df.get(col, pd.Series([0.0] * len(df)))
if s.dtype == bool:
s = s.astype(int)
numeric[col] = s.astype(float).values
blocks: list[pd.DataFrame] = [numeric]
for col, levels in meta["categorical_levels"].items():
val = df.get(col, pd.Series([None] * len(df))).astype(str)
block = pd.get_dummies(
val.astype("category").cat.set_categories(levels),
prefix=col, dummy_na=False,
).astype(int)
for lvl in levels:
cname = f"{col}_{lvl}"
if cname not in block.columns:
block[cname] = 0
block = block[[f"{col}_{lvl}" for lvl in levels]]
blocks.append(block)
X = pd.concat(blocks, axis=1).fillna(0.0)
X = X.reindex(columns=meta["feature_names"], fill_value=0.0)
return X.values.astype(np.float32)
def save_meta(meta: dict[str, Any], path: str | Path) -> None:
serializable = {
"feature_names": meta["feature_names"],
"numeric_features": meta["numeric_features"],
"categorical_levels": meta["categorical_levels"],
"label_to_int": meta["label_to_int"],
"int_to_label": {str(k): v for k, v in meta["int_to_label"].items()},
"oracle_excluded": meta.get("oracle_excluded", []),
}
with open(path, "w") as f:
json.dump(serializable, f, indent=2)
def load_meta(path: str | Path) -> dict[str, Any]:
with open(path) as f:
meta = json.load(f)
meta["int_to_label"] = {int(k): v for k, v in meta["int_to_label"].items()}
return meta
def build_host_lookup(hosts_path: str | Path) -> dict[str, dict]:
"""Build {host_id: {host feature values}} for inference-time lookup."""
hosts = pd.read_csv(hosts_path)
cols = HOST_NUMERIC_FEATURES + HOST_CATEGORICAL_FEATURES
out = {}
for _, row in hosts.iterrows():
out[row["host_id"]] = {c: row[c] for c in cols if c in hosts.columns}
return out
if __name__ == "__main__":
import sys
base = Path(sys.argv[1]) if len(sys.argv) > 1 else Path("/mnt/user-data/uploads")
X, y, ids, groups, meta = build_features(
base / "security_events.csv",
base / "host_inventory.csv",
)
print(f"X shape: {X.shape}")
print(f"y shape: {y.shape}")
print(f"groups: {groups.nunique()} unique incidents")
print(f"n_features: {len(meta['feature_names'])}")
print(f"label distribution:\n{y.map(INT_TO_LABEL).value_counts()}")
print(f"X has NaN: {X.isnull().any().any()}")