cyb003-baseline-classifier / feature_engineering.py
pradeep-xpert's picture
Initial release: XGBoost + MLP for malware execution phase classification
c6a80e7 verified
"""
feature_engineering.py
======================
Feature pipeline for the CYB003 baseline classifier.
Predicts `execution_phase` (10-class) from per-timestep malware execution
telemetry on the CYB003 sample dataset.
CSV inputs:
malware_samples.csv (primary, one row per timestep, 60 timesteps
per sample, 100 samples = 6000 rows)
sample_summary.csv (per-sample aggregates; reserved for future
work — joining inflates per-sample features
across 60 identical replications, which hurt
the model in pilot experiments)
environment_profiles.csv (reserved for future work)
execution_events.csv (reserved for future work)
Target classes (10 execution phases observed in the sample):
initial_drop, persistence_establishment, privilege_escalation,
lateral_movement, payload_execution, data_exfiltration,
c2_communication, dormancy_dwell, sandbox_evasion_stall,
self_destruct_cleanup
This corresponds to the SOC / sandbox-analyst use case: given the malware's
current behavioural state, what phase of execution is it in? Useful for
dynamic-analysis tools, EDR phase tagging, and behavioural classifiers.
The pivot to execution_phase (away from malware_family) happened because
malware family classification on n=100 samples with group-aware splitting
landed at majority-baseline accuracy (~15%, ROC-AUC ~0.58). execution_phase
sits on 6,000 rows of per-timestep data with strong, stable signal across
seeds (~91% accuracy, ROC-AUC ~0.98). See the model card for details.
Leakage analysis
----------------
No categorical feature has phase->phase purity above 0.17 (uniform random
baseline is 0.10), so nothing in the data is an oracle for the target.
The model relies on a mix of `timestep` (strong but not deterministic —
most phases have tight timestep windows, but `dormancy_dwell`,
`sandbox_evasion_stall`, and `self_destruct_cleanup` span the full
0-59 range) and behavioural features.
Public API
----------
build_features(samples_path) -> (X, y, groups, meta)
transform_single(record, meta) -> np.ndarray
save_meta(meta, path) / load_meta(path)
License
-------
Ships with the public model on Hugging Face under CC-BY-NC-4.0, matching
the dataset license. See README.md.
"""
from __future__ import annotations
import json
from pathlib import Path
from typing import Any
import numpy as np
import pandas as pd
# ---------------------------------------------------------------------------
# Label space
# ---------------------------------------------------------------------------
# Alphabetical for stable indexing.
LABEL_ORDER = [
"c2_communication",
"data_exfiltration",
"dormancy_dwell",
"initial_drop",
"lateral_movement",
"payload_execution",
"persistence_establishment",
"privilege_escalation",
"sandbox_evasion_stall",
"self_destruct_cleanup",
]
LABEL_TO_INT = {lbl: i for i, lbl in enumerate(LABEL_ORDER)}
INT_TO_LABEL = {i: lbl for lbl, i in LABEL_TO_INT.items()}
# ---------------------------------------------------------------------------
# Identifier and target columns - not features
# ---------------------------------------------------------------------------
ID_COLUMNS = ["sample_id", "family_id", "threat_actor_id"]
TARGET_COLUMN = "execution_phase"
# Note: malware_family is kept as a FEATURE for phase prediction (family
# is a useful observable - a SOC analyst knows what family they're looking
# at). It's not a leakage source for phase since phase->family purity is
# only 0.16. Same logic for threat_actor_tier, ep_stack, target_platform -
# these are environmental context, not oracles for phase.
# ---------------------------------------------------------------------------
# Per-timestep numeric features
# ---------------------------------------------------------------------------
DIRECT_NUMERIC_TIMESTEP_FEATURES = [
"timestep", # strong but non-deterministic phase signal
"api_call_rate",
"registry_write_count",
"network_connection_count",
"process_injection_flag",
"c2_beacon_interval_sec",
"av_signature_hit_flag",
"sandbox_evasion_flag",
"lateral_propagation_count",
"privilege_escalation_flag",
# PE static features (constant per sample but informative for phase
# given that the model sees these alongside per-step behaviour)
"pe_entropy_mean",
"pe_entropy_std",
"import_hash_cluster",
"section_count",
"packed_section_ratio",
"string_entropy_mean",
"byte_histogram_chi2",
"code_section_rx_ratio",
"resource_section_entropy",
"suspicious_import_count",
"packer_detected_flag",
]
CATEGORICAL_TIMESTEP_FEATURES = [
"malware_family", # kept as feature: phase prediction conditions
# on family (a known observable in SOC workflows)
"threat_actor_tier",
"target_platform",
"obfuscation_technique",
"detection_outcome",
"ep_stack",
]
# ---------------------------------------------------------------------------
# Engineered features (none derived from phase or timestep alone)
# ---------------------------------------------------------------------------
def _add_engineered_features(df: pd.DataFrame) -> pd.DataFrame:
"""
Six engineered features. None directly encode phase (that would be
a tautology); each is a behavioural composite that disambiguates
phases sharing similar timestep ranges.
"""
df = df.copy()
# 1. API burst score: high for execution-heavy phases (payload_execution,
# privilege_escalation), low for stealth phases (dormancy, evasion).
df["api_burst_score"] = (
df["api_call_rate"] * df["registry_write_count"].clip(upper=50)
).astype(float)
# 2. C2 active flag: positive c2_beacon_interval_sec indicates active
# beaconing. Strongly correlates with c2_communication phase.
df["is_c2_active"] = (df["c2_beacon_interval_sec"] > 0).astype(int)
# 3. High network volume step: above-threshold connection count, common
# in lateral_movement, data_exfiltration, c2_communication.
df["is_high_net_volume"] = (df["network_connection_count"] > 5).astype(int)
# 4. Stealth indicator: low api_call_rate AND no AV/sandbox hit. Used
# to disambiguate dormancy_dwell / sandbox_evasion_stall from active
# phases that happen to land in similar timestep windows.
df["is_stealth_step"] = (
(df["api_call_rate"] < 5)
& (df["av_signature_hit_flag"] == 0)
& (df["sandbox_evasion_flag"] == 0)
).astype(int)
# 5. Destructive action indicator: combines privilege escalation flag
# and registry-write count. High in persistence_establishment and
# self_destruct_cleanup.
df["is_destructive_step"] = (
(df["privilege_escalation_flag"] == 1)
| (df["registry_write_count"] > 10)
).astype(int)
# 6. Lateral activity: network connections combined with lateral_propagation
# count > 0. Distinguishes lateral_movement from other network phases.
df["lateral_activity_score"] = (
df["lateral_propagation_count"] * df["network_connection_count"]
).astype(float)
return df
# ---------------------------------------------------------------------------
# Public API
# ---------------------------------------------------------------------------
def build_features(
samples_path: str | Path,
) -> tuple[pd.DataFrame, pd.Series, pd.Series, dict[str, Any]]:
"""
Load CSV, drop identifier columns and target, engineer features,
one-hot encode, return (X, y, groups, meta).
`groups` is a Series of sample_id values aligned with X. Use it
with GroupShuffleSplit / GroupKFold: a single sample contains 60
correlated timesteps, and row-level random splitting inflates metrics.
"""
samples = pd.read_csv(samples_path)
# Extract target + groups
y = samples[TARGET_COLUMN].map(LABEL_TO_INT)
if y.isna().any():
bad = samples.loc[y.isna(), TARGET_COLUMN].unique()
raise ValueError(f"Unknown execution_phase values: {bad}")
y = y.astype(int)
groups = samples["sample_id"].copy()
# Drop target + identifiers from feature pool
samples = samples.drop(columns=ID_COLUMNS + [TARGET_COLUMN], errors="ignore")
# Engineered features
samples = _add_engineered_features(samples)
# Numeric features
numeric_features = (
DIRECT_NUMERIC_TIMESTEP_FEATURES
+ [
"api_burst_score", "is_c2_active", "is_high_net_volume",
"is_stealth_step", "is_destructive_step", "lateral_activity_score",
]
)
X_numeric = samples[numeric_features].astype(float)
# One-hot categoricals
categorical_levels: dict[str, list[str]] = {}
blocks: list[pd.DataFrame] = []
for col in CATEGORICAL_TIMESTEP_FEATURES:
if col not in samples.columns:
continue
levels = sorted(samples[col].dropna().unique().tolist())
categorical_levels[col] = levels
block = pd.get_dummies(
samples[col].astype("category").cat.set_categories(levels),
prefix=col, dummy_na=False,
).astype(int)
blocks.append(block)
X = pd.concat(
[X_numeric.reset_index(drop=True)]
+ [b.reset_index(drop=True) for b in blocks],
axis=1,
).fillna(0.0)
meta = {
"feature_names": X.columns.tolist(),
"numeric_features": numeric_features,
"categorical_levels": categorical_levels,
"label_to_int": LABEL_TO_INT,
"int_to_label": INT_TO_LABEL,
}
return X, y, groups, meta
def transform_single(
record: dict | pd.DataFrame,
meta: dict[str, Any],
) -> np.ndarray:
"""Encode a single timestep record for inference."""
if isinstance(record, dict):
df = pd.DataFrame([record.copy()])
else:
df = record.copy()
df = _add_engineered_features(df)
numeric = pd.DataFrame({
col: df.get(col, pd.Series([0.0] * len(df))).astype(float).values
for col in meta["numeric_features"]
})
blocks: list[pd.DataFrame] = [numeric]
for col, levels in meta["categorical_levels"].items():
val = df.get(col, pd.Series([None] * len(df)))
block = pd.get_dummies(
val.astype("category").cat.set_categories(levels),
prefix=col, dummy_na=False,
).astype(int)
for lvl in levels:
cname = f"{col}_{lvl}"
if cname not in block.columns:
block[cname] = 0
block = block[[f"{col}_{lvl}" for lvl in levels]]
blocks.append(block)
X = pd.concat(blocks, axis=1).fillna(0.0)
X = X.reindex(columns=meta["feature_names"], fill_value=0.0)
return X.values.astype(np.float32)
def save_meta(meta: dict[str, Any], path: str | Path) -> None:
serializable = {
"feature_names": meta["feature_names"],
"numeric_features": meta["numeric_features"],
"categorical_levels": meta["categorical_levels"],
"label_to_int": meta["label_to_int"],
"int_to_label": {str(k): v for k, v in meta["int_to_label"].items()},
}
with open(path, "w") as f:
json.dump(serializable, f, indent=2)
def load_meta(path: str | Path) -> dict[str, Any]:
with open(path) as f:
meta = json.load(f)
meta["int_to_label"] = {int(k): v for k, v in meta["int_to_label"].items()}
return meta
if __name__ == "__main__":
import sys
base = Path(sys.argv[1]) if len(sys.argv) > 1 else Path("/mnt/user-data/uploads")
X, y, groups, meta = build_features(base / "malware_samples.csv")
print(f"X shape: {X.shape}")
print(f"y shape: {y.shape}")
print(f"groups: {groups.nunique()} samples")
print(f"n features: {len(meta['feature_names'])}")
print(f"label distribution:\n{y.map(INT_TO_LABEL).value_counts()}")
print(f"X has NaN: {X.isnull().any().any()}")