| """ |
| feature_engineering.py |
| ====================== |
| |
| Feature pipeline for the CYB001 baseline classifier. |
| |
| This module produces a flow-level feature matrix and label vector from the |
| four CSV files distributed with the CYB001 sample dataset on Hugging Face: |
| |
| network_flows.csv (primary, one row per flow) |
| session_summary.csv (one row per session, joined on session_id) |
| network_topology.csv (one row per network segment, joined on segment_id) |
| flow_events.csv (one row per security event - NOT used for v1 |
| features; flows lose temporal granularity if |
| aggregated naively. Reserved for future work.) |
| |
| The pipeline is deliberately written to be read end-to-end. Every dropped |
| column is dropped with a one-line explanation. Every engineered feature |
| sits next to a one-sentence motivation. If you are evaluating the CYB001 |
| product, this file is the feature recipe; what the model "sees" is exactly |
| what this file emits. |
| |
| Public API |
| ---------- |
| build_features(flows_path, sessions_path, topology_path) -> (X, y, meta) |
| |
| X : pd.DataFrame - feature matrix, all numeric, no NaNs |
| y : pd.Series - integer-encoded label (0=BENIGN, 1=MALICIOUS, 2=AMBIGUOUS) |
| meta : dict - {feature_names, label_encoder, categorical_levels} |
| |
| The same `meta` dict is used at inference time so a new flow record gets |
| encoded identically to training. |
| |
| transform_single(record, meta) -> np.ndarray |
| |
| Encode a single flow record (dict or 1-row DataFrame) for inference. |
| |
| License |
| ------- |
| This file ships with the public model on Hugging Face under CC-BY-NC-4.0, |
| matching the dataset license. See README.md. |
| """ |
|
|
| from __future__ import annotations |
|
|
| import json |
| from pathlib import Path |
| from typing import Any |
|
|
| import numpy as np |
| import pandas as pd |
|
|
| |
| |
| |
|
|
| LABEL_ORDER = ["BENIGN", "MALICIOUS", "AMBIGUOUS"] |
| LABEL_TO_INT = {lbl: i for i, lbl in enumerate(LABEL_ORDER)} |
| INT_TO_LABEL = {i: lbl for lbl, i in LABEL_TO_INT.items()} |
|
|
| |
| |
| |
| |
| LEAKY_FLOW_COLUMNS = [ |
| "traffic_category", |
| "attack_subcategory", |
| "attacker_capability_tier", |
| ] |
|
|
| |
| ID_COLUMNS = [ |
| "flow_id", "session_id", |
| "source_ip_hash", "destination_ip_hash", |
| "flow_start_timestamp", |
| ] |
|
|
| |
| DIRECT_NUMERIC_FLOW_FEATURES = [ |
| "source_port", "dest_port", |
| "flow_duration_ms", |
| "total_fwd_packets", "total_bwd_packets", |
| "total_bytes_fwd", "total_bytes_bwd", |
| "fwd_packet_len_mean", "fwd_packet_len_std", |
| "bwd_packet_len_mean", "bwd_packet_len_std", |
| "flow_bytes_per_sec", "flow_packets_per_sec", |
| "inter_arrival_time_mean", "inter_arrival_time_std", |
| "tcp_flag_syn_count", "tcp_flag_ack_count", "tcp_flag_fin_count", |
| "tcp_flag_rst_count", "tcp_flag_psh_count", "tcp_flag_urg_count", |
| "retransmission_flag", "fragmentation_flag", "protocol_violation_flag", |
| ] |
|
|
| |
| |
| |
| |
| |
| |
| SESSION_FEATURES_KEEP = [ |
| "payload_entropy_mean", |
| "retransmission_rate", |
| "protocol_violation_count", |
| "c2_beacon_flag", |
| "session_risk_score", |
| ] |
|
|
| |
| TOPOLOGY_NUMERIC_FEATURES = [ |
| "trust_level", "avg_concurrent_flows", "bandwidth_mbps", |
| "nat_enabled", "ids_coverage", "diurnal_peak_factor", |
| "feature_space_dim", "alert_threshold", |
| "retraining_cadence_days", "ensemble_size", "device_count", |
| ] |
|
|
| |
| CATEGORICAL_FEATURES = [ |
| ("protocol", "flows"), |
| ("flow_lifecycle_phase", "flows"), |
| ("source_device_type", "flows"), |
| ("dest_device_type", "flows"), |
| ("segment_type", "topology"), |
| ("firewall_policy", "topology"), |
| ("qos_policy", "topology"), |
| ("defender_architecture","topology"), |
| ] |
|
|
|
|
| |
| |
| |
|
|
| def _safe_divide(num: pd.Series, denom: pd.Series, fill: float = 0.0) -> pd.Series: |
| """Element-wise divide, replacing inf/nan from div-by-zero with `fill`.""" |
| out = num / denom.replace(0, np.nan) |
| return out.replace([np.inf, -np.inf], np.nan).fillna(fill) |
|
|
|
|
| def _add_engineered_features(df: pd.DataFrame) -> pd.DataFrame: |
| """ |
| Add eight engineered features that encode domain hypotheses about how |
| each label class behaves. These are NOT learned; they are stated by hand |
| so a buyer can read this function and see what the model is told to look |
| at. Tree models can recover most of these on their own, but giving them |
| explicitly improves both XGBoost convergence and MLP performance. |
| """ |
| df = df.copy() |
|
|
| |
| |
| |
| df["iat_cv"] = _safe_divide(df["inter_arrival_time_std"], |
| df["inter_arrival_time_mean"]) |
|
|
| |
| |
| df["fwd_bwd_byte_ratio"] = _safe_divide(df["total_bytes_fwd"], |
| df["total_bytes_bwd"]) |
|
|
| |
| |
| total_fwd = df["total_fwd_packets"].replace(0, np.nan) |
| df["bytes_per_packet_fwd"] = (df["total_bytes_fwd"] / total_fwd).fillna(0) |
|
|
| |
| |
| total_packets = (df["total_fwd_packets"] + df["total_bwd_packets"]).replace(0, np.nan) |
| flag_total = (df["tcp_flag_rst_count"] + df["tcp_flag_urg_count"] |
| + df["tcp_flag_fin_count"]) |
| df["tcp_flag_anomaly_score"] = (flag_total / total_packets).fillna(0) |
|
|
| |
| |
| total_bytes = df["total_bytes_fwd"] + df["total_bytes_bwd"] |
| df["payload_density"] = (total_bytes / (total_packets * 1500)).fillna(0) |
|
|
| |
| |
| ts = pd.to_datetime(df["flow_start_timestamp"], errors="coerce") |
| hour = ts.dt.hour.fillna(12).astype(int) |
| df["hour_of_day"] = hour |
| df["is_off_hours"] = ((hour < 6) | (hour > 22)).astype(int) |
|
|
| |
| df["is_well_known_dest_port"] = (df["dest_port"] < 1024).astype(int) |
| df["is_ephemeral_src_port"] = (df["source_port"] >= 49152).astype(int) |
|
|
| return df |
|
|
|
|
| |
| |
| |
|
|
| def build_features( |
| flows_path: str | Path, |
| sessions_path: str | Path, |
| topology_path: str | Path, |
| ) -> tuple[pd.DataFrame, pd.Series, dict[str, Any]]: |
| """ |
| Load the three CSVs, join them, drop leaky columns, engineer features, |
| one-hot encode categoricals, and return (X, y, meta). |
| |
| The returned `meta` dict captures the column order and the categorical |
| level set, which is what `transform_single` needs at inference time to |
| encode a new record identically. |
| """ |
| flows = pd.read_csv(flows_path) |
| sessions = pd.read_csv(sessions_path) |
| topology = pd.read_csv(topology_path) |
|
|
| |
| flows = flows.drop(columns=LEAKY_FLOW_COLUMNS, errors="ignore") |
|
|
| |
| df = flows.merge( |
| sessions[["session_id"] + SESSION_FEATURES_KEEP], |
| on="session_id", how="left", |
| ) |
|
|
| |
| topo_cols = ["segment_id"] + TOPOLOGY_NUMERIC_FEATURES + [ |
| col for col, src in CATEGORICAL_FEATURES if src == "topology" |
| ] |
| df = df.merge(topology[topo_cols], on="segment_id", how="left") |
|
|
| |
| y = df["label"].map(LABEL_TO_INT).astype(int) |
|
|
| |
| df = _add_engineered_features(df) |
|
|
| |
| numeric_features = ( |
| DIRECT_NUMERIC_FLOW_FEATURES |
| + SESSION_FEATURES_KEEP |
| + TOPOLOGY_NUMERIC_FEATURES |
| + [ |
| "iat_cv", "fwd_bwd_byte_ratio", "bytes_per_packet_fwd", |
| "tcp_flag_anomaly_score", "payload_density", |
| "hour_of_day", "is_off_hours", |
| "is_well_known_dest_port", "is_ephemeral_src_port", |
| ] |
| ) |
|
|
| X_numeric = df[numeric_features].astype(float) |
|
|
| |
| |
| |
| |
| categorical_levels: dict[str, list[str]] = {} |
| one_hot_blocks: list[pd.DataFrame] = [] |
| for col, _src in CATEGORICAL_FEATURES: |
| levels = sorted(df[col].dropna().unique().tolist()) |
| categorical_levels[col] = levels |
| block = pd.get_dummies( |
| df[col].astype("category").cat.set_categories(levels), |
| prefix=col, dummy_na=False, |
| ).astype(int) |
| one_hot_blocks.append(block) |
|
|
| X = pd.concat([X_numeric.reset_index(drop=True)] |
| + [b.reset_index(drop=True) for b in one_hot_blocks], axis=1) |
|
|
| |
| |
| X = X.fillna(0.0) |
|
|
| meta = { |
| "feature_names": X.columns.tolist(), |
| "numeric_features": numeric_features, |
| "categorical_levels": categorical_levels, |
| "label_to_int": LABEL_TO_INT, |
| "int_to_label": INT_TO_LABEL, |
| } |
|
|
| return X, y, meta |
|
|
|
|
| def transform_single(record: dict | pd.DataFrame, meta: dict[str, Any]) -> np.ndarray: |
| """ |
| Encode a single flow record for inference. |
| |
| `record` must contain the same columns as network_flows.csv (minus the |
| leaky columns), plus the joined session and topology fields. If you only |
| have the flow row, you must look up the matching session_summary row and |
| network_topology row and merge them into `record` before calling this. |
| |
| Returns a (1, n_features) numpy array ready for model.predict_proba. |
| """ |
| if isinstance(record, dict): |
| df = pd.DataFrame([record]) |
| else: |
| df = record.copy() |
|
|
| df = _add_engineered_features(df) |
|
|
| |
| numeric = pd.DataFrame({ |
| col: df.get(col, pd.Series([0.0] * len(df))).astype(float).values |
| for col in meta["numeric_features"] |
| }) |
|
|
| |
| blocks: list[pd.DataFrame] = [numeric] |
| for col, levels in meta["categorical_levels"].items(): |
| val = df.get(col, pd.Series([None] * len(df))) |
| block = pd.get_dummies( |
| val.astype("category").cat.set_categories(levels), |
| prefix=col, dummy_na=False, |
| ).astype(int) |
| |
| |
| for lvl in levels: |
| colname = f"{col}_{lvl}" |
| if colname not in block.columns: |
| block[colname] = 0 |
| block = block[[f"{col}_{lvl}" for lvl in levels]] |
| blocks.append(block) |
|
|
| X = pd.concat(blocks, axis=1).fillna(0.0) |
|
|
| |
| X = X.reindex(columns=meta["feature_names"], fill_value=0.0) |
| return X.values.astype(np.float32) |
|
|
|
|
| def save_meta(meta: dict[str, Any], path: str | Path) -> None: |
| """Persist meta to JSON for inference-time reuse.""" |
| serializable = { |
| "feature_names": meta["feature_names"], |
| "numeric_features": meta["numeric_features"], |
| "categorical_levels": meta["categorical_levels"], |
| "label_to_int": meta["label_to_int"], |
| "int_to_label": {str(k): v for k, v in meta["int_to_label"].items()}, |
| } |
| with open(path, "w") as f: |
| json.dump(serializable, f, indent=2) |
|
|
|
|
| def load_meta(path: str | Path) -> dict[str, Any]: |
| """Load meta from JSON.""" |
| with open(path) as f: |
| meta = json.load(f) |
| meta["int_to_label"] = {int(k): v for k, v in meta["int_to_label"].items()} |
| return meta |
|
|
|
|
| if __name__ == "__main__": |
| |
| import sys |
| base = Path(sys.argv[1]) if len(sys.argv) > 1 else Path("/mnt/user-data/uploads") |
| X, y, meta = build_features( |
| base / "network_flows.csv", |
| base / "session_summary.csv", |
| base / "network_topology.csv", |
| ) |
| print(f"X shape: {X.shape}") |
| print(f"y shape: {y.shape}") |
| print(f"n features: {len(meta['feature_names'])}") |
| print(f"label distribution:\n{y.map(INT_TO_LABEL).value_counts()}") |
| print(f"X dtypes unique: {X.dtypes.unique()}") |
| print(f"X has NaN: {X.isnull().any().any()}") |
|
|