""" feature_engineering.py ====================== Feature pipeline for the CYB001 baseline classifier. This module produces a flow-level feature matrix and label vector from the four CSV files distributed with the CYB001 sample dataset on Hugging Face: network_flows.csv (primary, one row per flow) session_summary.csv (one row per session, joined on session_id) network_topology.csv (one row per network segment, joined on segment_id) flow_events.csv (one row per security event - NOT used for v1 features; flows lose temporal granularity if aggregated naively. Reserved for future work.) The pipeline is deliberately written to be read end-to-end. Every dropped column is dropped with a one-line explanation. Every engineered feature sits next to a one-sentence motivation. If you are evaluating the CYB001 product, this file is the feature recipe; what the model "sees" is exactly what this file emits. Public API ---------- build_features(flows_path, sessions_path, topology_path) -> (X, y, meta) X : pd.DataFrame - feature matrix, all numeric, no NaNs y : pd.Series - integer-encoded label (0=BENIGN, 1=MALICIOUS, 2=AMBIGUOUS) meta : dict - {feature_names, label_encoder, categorical_levels} The same `meta` dict is used at inference time so a new flow record gets encoded identically to training. transform_single(record, meta) -> np.ndarray Encode a single flow record (dict or 1-row DataFrame) for inference. License ------- This file ships with the public model on Hugging Face under CC-BY-NC-4.0, matching the dataset license. See README.md. """ from __future__ import annotations import json from pathlib import Path from typing import Any import numpy as np import pandas as pd # --------------------------------------------------------------------------- # Constants - what we keep, what we drop, and why # --------------------------------------------------------------------------- LABEL_ORDER = ["BENIGN", "MALICIOUS", "AMBIGUOUS"] # index 0, 1, 2 LABEL_TO_INT = {lbl: i for i, lbl in enumerate(LABEL_ORDER)} INT_TO_LABEL = {i: lbl for lbl, i in LABEL_TO_INT.items()} # Columns dropped from network_flows.csv because they are ground-truth # generator metadata, not observables a real IDS would have at inference time. # Including any of these gives perfect or near-perfect accuracy that does # not reflect real-world performance. LEAKY_FLOW_COLUMNS = [ "traffic_category", # 100% deterministic of label (attack_*/benign_*/ambiguous_*) "attack_subcategory", # null iff label != MALICIOUS "attacker_capability_tier", # labeled per flow including benign - generator metadata ] # Identifier / non-feature columns ID_COLUMNS = [ "flow_id", "session_id", "source_ip_hash", "destination_ip_hash", # SHA-256 pseudonyms, not useful as features "flow_start_timestamp", # consumed by is_off_hours engineered feature ] # Direct numeric features from network_flows.csv (pass-through) DIRECT_NUMERIC_FLOW_FEATURES = [ "source_port", "dest_port", "flow_duration_ms", "total_fwd_packets", "total_bwd_packets", "total_bytes_fwd", "total_bytes_bwd", "fwd_packet_len_mean", "fwd_packet_len_std", "bwd_packet_len_mean", "bwd_packet_len_std", "flow_bytes_per_sec", "flow_packets_per_sec", "inter_arrival_time_mean", "inter_arrival_time_std", "tcp_flag_syn_count", "tcp_flag_ack_count", "tcp_flag_fin_count", "tcp_flag_rst_count", "tcp_flag_psh_count", "tcp_flag_urg_count", "retransmission_flag", "fragmentation_flag", "protocol_violation_flag", ] # Session-level numeric features (joined on session_id). # Selected after a per-label leakage audit: # KEEP: payload_entropy_mean, retransmission_rate, protocol_violation_count, # c2_beacon_flag, session_risk_score (overlapping distributions across labels) # DROP: exfil_volume_bytes, scan_probe_count, lateral_move_flag # (zero for all BENIGN/AMBIGUOUS - generator oracles, not detector outputs) SESSION_FEATURES_KEEP = [ "payload_entropy_mean", "retransmission_rate", "protocol_violation_count", "c2_beacon_flag", "session_risk_score", ] # Topology-level numeric features (joined on segment_id) TOPOLOGY_NUMERIC_FEATURES = [ "trust_level", "avg_concurrent_flows", "bandwidth_mbps", "nat_enabled", "ids_coverage", "diurnal_peak_factor", "feature_space_dim", "alert_threshold", "retraining_cadence_days", "ensemble_size", "device_count", ] # Categorical columns that get one-hot encoded CATEGORICAL_FEATURES = [ ("protocol", "flows"), # TCP / UDP / HTTPS / DNS / SMTP / SSH / FTP / NTP ("flow_lifecycle_phase", "flows"), # initiation / handshake / transfer / ... ("source_device_type", "flows"), # workstation / server / iot / mobile / cloud / ot ("dest_device_type", "flows"), ("segment_type", "topology"), # corporate_lan / dmz / cloud_workload / ... ("firewall_policy", "topology"), ("qos_policy", "topology"), ("defender_architecture","topology"), ] # --------------------------------------------------------------------------- # Engineered features # --------------------------------------------------------------------------- def _safe_divide(num: pd.Series, denom: pd.Series, fill: float = 0.0) -> pd.Series: """Element-wise divide, replacing inf/nan from div-by-zero with `fill`.""" out = num / denom.replace(0, np.nan) return out.replace([np.inf, -np.inf], np.nan).fillna(fill) def _add_engineered_features(df: pd.DataFrame) -> pd.DataFrame: """ Add eight engineered features that encode domain hypotheses about how each label class behaves. These are NOT learned; they are stated by hand so a buyer can read this function and see what the model is told to look at. Tree models can recover most of these on their own, but giving them explicitly improves both XGBoost convergence and MLP performance. """ df = df.copy() # IAT coefficient of variation. Low cv => regular inter-arrival times # => C2 beacon signature (the dataset is calibrated to cv ~= 0.065 for # APT beacons, regularity score ~= 0.93 per the README). df["iat_cv"] = _safe_divide(df["inter_arrival_time_std"], df["inter_arrival_time_mean"]) # Forward/backward byte ratio. >> 1 indicates upload-heavy flow, which # is the exfiltration signature. df["fwd_bwd_byte_ratio"] = _safe_divide(df["total_bytes_fwd"], df["total_bytes_bwd"]) # Bytes per packet (forward direction). Combined with packet length # std, separates streaming traffic from short-message protocols. total_fwd = df["total_fwd_packets"].replace(0, np.nan) df["bytes_per_packet_fwd"] = (df["total_bytes_fwd"] / total_fwd).fillna(0) # TCP flag anomaly score. RST and URG together, or high counts relative # to total packets, indicate scan/probe or protocol misuse. total_packets = (df["total_fwd_packets"] + df["total_bwd_packets"]).replace(0, np.nan) flag_total = (df["tcp_flag_rst_count"] + df["tcp_flag_urg_count"] + df["tcp_flag_fin_count"]) df["tcp_flag_anomaly_score"] = (flag_total / total_packets).fillna(0) # Payload density. Bytes per packet, normalized to MTU. Low density on # high packet counts indicates beaconing or keep-alive. total_bytes = df["total_bytes_fwd"] + df["total_bytes_bwd"] df["payload_density"] = (total_bytes / (total_packets * 1500)).fillna(0) # Hour of day from timestamp. Off-hours bias is calibrated into the # APT and insider-threat tiers. ts = pd.to_datetime(df["flow_start_timestamp"], errors="coerce") hour = ts.dt.hour.fillna(12).astype(int) df["hour_of_day"] = hour df["is_off_hours"] = ((hour < 6) | (hour > 22)).astype(int) # Port observables. Well-known ports < 1024, ephemeral ports >= 49152. df["is_well_known_dest_port"] = (df["dest_port"] < 1024).astype(int) df["is_ephemeral_src_port"] = (df["source_port"] >= 49152).astype(int) return df # --------------------------------------------------------------------------- # Public API # --------------------------------------------------------------------------- def build_features( flows_path: str | Path, sessions_path: str | Path, topology_path: str | Path, ) -> tuple[pd.DataFrame, pd.Series, dict[str, Any]]: """ Load the three CSVs, join them, drop leaky columns, engineer features, one-hot encode categoricals, and return (X, y, meta). The returned `meta` dict captures the column order and the categorical level set, which is what `transform_single` needs at inference time to encode a new record identically. """ flows = pd.read_csv(flows_path) sessions = pd.read_csv(sessions_path) topology = pd.read_csv(topology_path) # Drop columns that leak the label (see LEAKY_FLOW_COLUMNS for rationale) flows = flows.drop(columns=LEAKY_FLOW_COLUMNS, errors="ignore") # Join session-level aggregates df = flows.merge( sessions[["session_id"] + SESSION_FEATURES_KEEP], on="session_id", how="left", ) # Join topology features (numeric + categorical) topo_cols = ["segment_id"] + TOPOLOGY_NUMERIC_FEATURES + [ col for col, src in CATEGORICAL_FEATURES if src == "topology" ] df = df.merge(topology[topo_cols], on="segment_id", how="left") # Extract labels before adding features y = df["label"].map(LABEL_TO_INT).astype(int) # Engineered features df = _add_engineered_features(df) # Assemble feature columns numeric_features = ( DIRECT_NUMERIC_FLOW_FEATURES + SESSION_FEATURES_KEEP + TOPOLOGY_NUMERIC_FEATURES + [ "iat_cv", "fwd_bwd_byte_ratio", "bytes_per_packet_fwd", "tcp_flag_anomaly_score", "payload_density", "hour_of_day", "is_off_hours", "is_well_known_dest_port", "is_ephemeral_src_port", ] ) X_numeric = df[numeric_features].astype(float) # One-hot encode categoricals. Record the level set in `meta` so we can # reproduce the same columns at inference time even if a new record # contains an unseen level (it will encode to all-zero, which is the # correct fallback for one-hot). categorical_levels: dict[str, list[str]] = {} one_hot_blocks: list[pd.DataFrame] = [] for col, _src in CATEGORICAL_FEATURES: levels = sorted(df[col].dropna().unique().tolist()) categorical_levels[col] = levels block = pd.get_dummies( df[col].astype("category").cat.set_categories(levels), prefix=col, dummy_na=False, ).astype(int) one_hot_blocks.append(block) X = pd.concat([X_numeric.reset_index(drop=True)] + [b.reset_index(drop=True) for b in one_hot_blocks], axis=1) # Final NaN sweep (defensive - session join can introduce NaN if a # session_id is missing from session_summary.csv). X = X.fillna(0.0) meta = { "feature_names": X.columns.tolist(), "numeric_features": numeric_features, "categorical_levels": categorical_levels, "label_to_int": LABEL_TO_INT, "int_to_label": INT_TO_LABEL, } return X, y, meta def transform_single(record: dict | pd.DataFrame, meta: dict[str, Any]) -> np.ndarray: """ Encode a single flow record for inference. `record` must contain the same columns as network_flows.csv (minus the leaky columns), plus the joined session and topology fields. If you only have the flow row, you must look up the matching session_summary row and network_topology row and merge them into `record` before calling this. Returns a (1, n_features) numpy array ready for model.predict_proba. """ if isinstance(record, dict): df = pd.DataFrame([record]) else: df = record.copy() df = _add_engineered_features(df) # Numeric features in fixed order numeric = pd.DataFrame({ col: df.get(col, pd.Series([0.0] * len(df))).astype(float).values for col in meta["numeric_features"] }) # One-hot blocks in fixed order, using the levels seen at fit time blocks: list[pd.DataFrame] = [numeric] for col, levels in meta["categorical_levels"].items(): val = df.get(col, pd.Series([None] * len(df))) block = pd.get_dummies( val.astype("category").cat.set_categories(levels), prefix=col, dummy_na=False, ).astype(int) # Ensure all expected level columns are present (in case a level # didn't appear in this single record) for lvl in levels: colname = f"{col}_{lvl}" if colname not in block.columns: block[colname] = 0 block = block[[f"{col}_{lvl}" for lvl in levels]] blocks.append(block) X = pd.concat(blocks, axis=1).fillna(0.0) # Reorder to match training column order exactly X = X.reindex(columns=meta["feature_names"], fill_value=0.0) return X.values.astype(np.float32) def save_meta(meta: dict[str, Any], path: str | Path) -> None: """Persist meta to JSON for inference-time reuse.""" serializable = { "feature_names": meta["feature_names"], "numeric_features": meta["numeric_features"], "categorical_levels": meta["categorical_levels"], "label_to_int": meta["label_to_int"], "int_to_label": {str(k): v for k, v in meta["int_to_label"].items()}, } with open(path, "w") as f: json.dump(serializable, f, indent=2) def load_meta(path: str | Path) -> dict[str, Any]: """Load meta from JSON.""" with open(path) as f: meta = json.load(f) meta["int_to_label"] = {int(k): v for k, v in meta["int_to_label"].items()} return meta if __name__ == "__main__": # Smoke test import sys base = Path(sys.argv[1]) if len(sys.argv) > 1 else Path("/mnt/user-data/uploads") X, y, meta = build_features( base / "network_flows.csv", base / "session_summary.csv", base / "network_topology.csv", ) print(f"X shape: {X.shape}") print(f"y shape: {y.shape}") print(f"n features: {len(meta['feature_names'])}") print(f"label distribution:\n{y.map(INT_TO_LABEL).value_counts()}") print(f"X dtypes unique: {X.dtypes.unique()}") print(f"X has NaN: {X.isnull().any().any()}")