""" feature_engineering.py ====================== Feature pipeline for the CYB003 baseline classifier. Predicts `execution_phase` (10-class) from per-timestep malware execution telemetry on the CYB003 sample dataset. CSV inputs: malware_samples.csv (primary, one row per timestep, 60 timesteps per sample, 100 samples = 6000 rows) sample_summary.csv (per-sample aggregates; reserved for future work — joining inflates per-sample features across 60 identical replications, which hurt the model in pilot experiments) environment_profiles.csv (reserved for future work) execution_events.csv (reserved for future work) Target classes (10 execution phases observed in the sample): initial_drop, persistence_establishment, privilege_escalation, lateral_movement, payload_execution, data_exfiltration, c2_communication, dormancy_dwell, sandbox_evasion_stall, self_destruct_cleanup This corresponds to the SOC / sandbox-analyst use case: given the malware's current behavioural state, what phase of execution is it in? Useful for dynamic-analysis tools, EDR phase tagging, and behavioural classifiers. The pivot to execution_phase (away from malware_family) happened because malware family classification on n=100 samples with group-aware splitting landed at majority-baseline accuracy (~15%, ROC-AUC ~0.58). execution_phase sits on 6,000 rows of per-timestep data with strong, stable signal across seeds (~91% accuracy, ROC-AUC ~0.98). See the model card for details. Leakage analysis ---------------- No categorical feature has phase->phase purity above 0.17 (uniform random baseline is 0.10), so nothing in the data is an oracle for the target. The model relies on a mix of `timestep` (strong but not deterministic — most phases have tight timestep windows, but `dormancy_dwell`, `sandbox_evasion_stall`, and `self_destruct_cleanup` span the full 0-59 range) and behavioural features. Public API ---------- build_features(samples_path) -> (X, y, groups, meta) transform_single(record, meta) -> np.ndarray save_meta(meta, path) / load_meta(path) License ------- Ships with the public model on Hugging Face under CC-BY-NC-4.0, matching the dataset license. See README.md. """ from __future__ import annotations import json from pathlib import Path from typing import Any import numpy as np import pandas as pd # --------------------------------------------------------------------------- # Label space # --------------------------------------------------------------------------- # Alphabetical for stable indexing. LABEL_ORDER = [ "c2_communication", "data_exfiltration", "dormancy_dwell", "initial_drop", "lateral_movement", "payload_execution", "persistence_establishment", "privilege_escalation", "sandbox_evasion_stall", "self_destruct_cleanup", ] LABEL_TO_INT = {lbl: i for i, lbl in enumerate(LABEL_ORDER)} INT_TO_LABEL = {i: lbl for lbl, i in LABEL_TO_INT.items()} # --------------------------------------------------------------------------- # Identifier and target columns - not features # --------------------------------------------------------------------------- ID_COLUMNS = ["sample_id", "family_id", "threat_actor_id"] TARGET_COLUMN = "execution_phase" # Note: malware_family is kept as a FEATURE for phase prediction (family # is a useful observable - a SOC analyst knows what family they're looking # at). It's not a leakage source for phase since phase->family purity is # only 0.16. Same logic for threat_actor_tier, ep_stack, target_platform - # these are environmental context, not oracles for phase. # --------------------------------------------------------------------------- # Per-timestep numeric features # --------------------------------------------------------------------------- DIRECT_NUMERIC_TIMESTEP_FEATURES = [ "timestep", # strong but non-deterministic phase signal "api_call_rate", "registry_write_count", "network_connection_count", "process_injection_flag", "c2_beacon_interval_sec", "av_signature_hit_flag", "sandbox_evasion_flag", "lateral_propagation_count", "privilege_escalation_flag", # PE static features (constant per sample but informative for phase # given that the model sees these alongside per-step behaviour) "pe_entropy_mean", "pe_entropy_std", "import_hash_cluster", "section_count", "packed_section_ratio", "string_entropy_mean", "byte_histogram_chi2", "code_section_rx_ratio", "resource_section_entropy", "suspicious_import_count", "packer_detected_flag", ] CATEGORICAL_TIMESTEP_FEATURES = [ "malware_family", # kept as feature: phase prediction conditions # on family (a known observable in SOC workflows) "threat_actor_tier", "target_platform", "obfuscation_technique", "detection_outcome", "ep_stack", ] # --------------------------------------------------------------------------- # Engineered features (none derived from phase or timestep alone) # --------------------------------------------------------------------------- def _add_engineered_features(df: pd.DataFrame) -> pd.DataFrame: """ Six engineered features. None directly encode phase (that would be a tautology); each is a behavioural composite that disambiguates phases sharing similar timestep ranges. """ df = df.copy() # 1. API burst score: high for execution-heavy phases (payload_execution, # privilege_escalation), low for stealth phases (dormancy, evasion). df["api_burst_score"] = ( df["api_call_rate"] * df["registry_write_count"].clip(upper=50) ).astype(float) # 2. C2 active flag: positive c2_beacon_interval_sec indicates active # beaconing. Strongly correlates with c2_communication phase. df["is_c2_active"] = (df["c2_beacon_interval_sec"] > 0).astype(int) # 3. High network volume step: above-threshold connection count, common # in lateral_movement, data_exfiltration, c2_communication. df["is_high_net_volume"] = (df["network_connection_count"] > 5).astype(int) # 4. Stealth indicator: low api_call_rate AND no AV/sandbox hit. Used # to disambiguate dormancy_dwell / sandbox_evasion_stall from active # phases that happen to land in similar timestep windows. df["is_stealth_step"] = ( (df["api_call_rate"] < 5) & (df["av_signature_hit_flag"] == 0) & (df["sandbox_evasion_flag"] == 0) ).astype(int) # 5. Destructive action indicator: combines privilege escalation flag # and registry-write count. High in persistence_establishment and # self_destruct_cleanup. df["is_destructive_step"] = ( (df["privilege_escalation_flag"] == 1) | (df["registry_write_count"] > 10) ).astype(int) # 6. Lateral activity: network connections combined with lateral_propagation # count > 0. Distinguishes lateral_movement from other network phases. df["lateral_activity_score"] = ( df["lateral_propagation_count"] * df["network_connection_count"] ).astype(float) return df # --------------------------------------------------------------------------- # Public API # --------------------------------------------------------------------------- def build_features( samples_path: str | Path, ) -> tuple[pd.DataFrame, pd.Series, pd.Series, dict[str, Any]]: """ Load CSV, drop identifier columns and target, engineer features, one-hot encode, return (X, y, groups, meta). `groups` is a Series of sample_id values aligned with X. Use it with GroupShuffleSplit / GroupKFold: a single sample contains 60 correlated timesteps, and row-level random splitting inflates metrics. """ samples = pd.read_csv(samples_path) # Extract target + groups y = samples[TARGET_COLUMN].map(LABEL_TO_INT) if y.isna().any(): bad = samples.loc[y.isna(), TARGET_COLUMN].unique() raise ValueError(f"Unknown execution_phase values: {bad}") y = y.astype(int) groups = samples["sample_id"].copy() # Drop target + identifiers from feature pool samples = samples.drop(columns=ID_COLUMNS + [TARGET_COLUMN], errors="ignore") # Engineered features samples = _add_engineered_features(samples) # Numeric features numeric_features = ( DIRECT_NUMERIC_TIMESTEP_FEATURES + [ "api_burst_score", "is_c2_active", "is_high_net_volume", "is_stealth_step", "is_destructive_step", "lateral_activity_score", ] ) X_numeric = samples[numeric_features].astype(float) # One-hot categoricals categorical_levels: dict[str, list[str]] = {} blocks: list[pd.DataFrame] = [] for col in CATEGORICAL_TIMESTEP_FEATURES: if col not in samples.columns: continue levels = sorted(samples[col].dropna().unique().tolist()) categorical_levels[col] = levels block = pd.get_dummies( samples[col].astype("category").cat.set_categories(levels), prefix=col, dummy_na=False, ).astype(int) blocks.append(block) X = pd.concat( [X_numeric.reset_index(drop=True)] + [b.reset_index(drop=True) for b in blocks], axis=1, ).fillna(0.0) meta = { "feature_names": X.columns.tolist(), "numeric_features": numeric_features, "categorical_levels": categorical_levels, "label_to_int": LABEL_TO_INT, "int_to_label": INT_TO_LABEL, } return X, y, groups, meta def transform_single( record: dict | pd.DataFrame, meta: dict[str, Any], ) -> np.ndarray: """Encode a single timestep record for inference.""" if isinstance(record, dict): df = pd.DataFrame([record.copy()]) else: df = record.copy() df = _add_engineered_features(df) numeric = pd.DataFrame({ col: df.get(col, pd.Series([0.0] * len(df))).astype(float).values for col in meta["numeric_features"] }) blocks: list[pd.DataFrame] = [numeric] for col, levels in meta["categorical_levels"].items(): val = df.get(col, pd.Series([None] * len(df))) block = pd.get_dummies( val.astype("category").cat.set_categories(levels), prefix=col, dummy_na=False, ).astype(int) for lvl in levels: cname = f"{col}_{lvl}" if cname not in block.columns: block[cname] = 0 block = block[[f"{col}_{lvl}" for lvl in levels]] blocks.append(block) X = pd.concat(blocks, axis=1).fillna(0.0) X = X.reindex(columns=meta["feature_names"], fill_value=0.0) return X.values.astype(np.float32) def save_meta(meta: dict[str, Any], path: str | Path) -> None: serializable = { "feature_names": meta["feature_names"], "numeric_features": meta["numeric_features"], "categorical_levels": meta["categorical_levels"], "label_to_int": meta["label_to_int"], "int_to_label": {str(k): v for k, v in meta["int_to_label"].items()}, } with open(path, "w") as f: json.dump(serializable, f, indent=2) def load_meta(path: str | Path) -> dict[str, Any]: with open(path) as f: meta = json.load(f) meta["int_to_label"] = {int(k): v for k, v in meta["int_to_label"].items()} return meta if __name__ == "__main__": import sys base = Path(sys.argv[1]) if len(sys.argv) > 1 else Path("/mnt/user-data/uploads") X, y, groups, meta = build_features(base / "malware_samples.csv") print(f"X shape: {X.shape}") print(f"y shape: {y.shape}") print(f"groups: {groups.nunique()} samples") print(f"n features: {len(meta['feature_names'])}") print(f"label distribution:\n{y.map(INT_TO_LABEL).value_counts()}") print(f"X has NaN: {X.isnull().any().any()}")