cyb001-baseline-classifier / feature_engineering.py
pradeep-xpert's picture
Initial release: XGBoost + MLP baseline on CYB001 sample
721fce4 verified
"""
feature_engineering.py
======================
Feature pipeline for the CYB001 baseline classifier.
This module produces a flow-level feature matrix and label vector from the
four CSV files distributed with the CYB001 sample dataset on Hugging Face:
network_flows.csv (primary, one row per flow)
session_summary.csv (one row per session, joined on session_id)
network_topology.csv (one row per network segment, joined on segment_id)
flow_events.csv (one row per security event - NOT used for v1
features; flows lose temporal granularity if
aggregated naively. Reserved for future work.)
The pipeline is deliberately written to be read end-to-end. Every dropped
column is dropped with a one-line explanation. Every engineered feature
sits next to a one-sentence motivation. If you are evaluating the CYB001
product, this file is the feature recipe; what the model "sees" is exactly
what this file emits.
Public API
----------
build_features(flows_path, sessions_path, topology_path) -> (X, y, meta)
X : pd.DataFrame - feature matrix, all numeric, no NaNs
y : pd.Series - integer-encoded label (0=BENIGN, 1=MALICIOUS, 2=AMBIGUOUS)
meta : dict - {feature_names, label_encoder, categorical_levels}
The same `meta` dict is used at inference time so a new flow record gets
encoded identically to training.
transform_single(record, meta) -> np.ndarray
Encode a single flow record (dict or 1-row DataFrame) for inference.
License
-------
This file ships with the public model on Hugging Face under CC-BY-NC-4.0,
matching the dataset license. See README.md.
"""
from __future__ import annotations
import json
from pathlib import Path
from typing import Any
import numpy as np
import pandas as pd
# ---------------------------------------------------------------------------
# Constants - what we keep, what we drop, and why
# ---------------------------------------------------------------------------
LABEL_ORDER = ["BENIGN", "MALICIOUS", "AMBIGUOUS"] # index 0, 1, 2
LABEL_TO_INT = {lbl: i for i, lbl in enumerate(LABEL_ORDER)}
INT_TO_LABEL = {i: lbl for lbl, i in LABEL_TO_INT.items()}
# Columns dropped from network_flows.csv because they are ground-truth
# generator metadata, not observables a real IDS would have at inference time.
# Including any of these gives perfect or near-perfect accuracy that does
# not reflect real-world performance.
LEAKY_FLOW_COLUMNS = [
"traffic_category", # 100% deterministic of label (attack_*/benign_*/ambiguous_*)
"attack_subcategory", # null iff label != MALICIOUS
"attacker_capability_tier", # labeled per flow including benign - generator metadata
]
# Identifier / non-feature columns
ID_COLUMNS = [
"flow_id", "session_id",
"source_ip_hash", "destination_ip_hash", # SHA-256 pseudonyms, not useful as features
"flow_start_timestamp", # consumed by is_off_hours engineered feature
]
# Direct numeric features from network_flows.csv (pass-through)
DIRECT_NUMERIC_FLOW_FEATURES = [
"source_port", "dest_port",
"flow_duration_ms",
"total_fwd_packets", "total_bwd_packets",
"total_bytes_fwd", "total_bytes_bwd",
"fwd_packet_len_mean", "fwd_packet_len_std",
"bwd_packet_len_mean", "bwd_packet_len_std",
"flow_bytes_per_sec", "flow_packets_per_sec",
"inter_arrival_time_mean", "inter_arrival_time_std",
"tcp_flag_syn_count", "tcp_flag_ack_count", "tcp_flag_fin_count",
"tcp_flag_rst_count", "tcp_flag_psh_count", "tcp_flag_urg_count",
"retransmission_flag", "fragmentation_flag", "protocol_violation_flag",
]
# Session-level numeric features (joined on session_id).
# Selected after a per-label leakage audit:
# KEEP: payload_entropy_mean, retransmission_rate, protocol_violation_count,
# c2_beacon_flag, session_risk_score (overlapping distributions across labels)
# DROP: exfil_volume_bytes, scan_probe_count, lateral_move_flag
# (zero for all BENIGN/AMBIGUOUS - generator oracles, not detector outputs)
SESSION_FEATURES_KEEP = [
"payload_entropy_mean",
"retransmission_rate",
"protocol_violation_count",
"c2_beacon_flag",
"session_risk_score",
]
# Topology-level numeric features (joined on segment_id)
TOPOLOGY_NUMERIC_FEATURES = [
"trust_level", "avg_concurrent_flows", "bandwidth_mbps",
"nat_enabled", "ids_coverage", "diurnal_peak_factor",
"feature_space_dim", "alert_threshold",
"retraining_cadence_days", "ensemble_size", "device_count",
]
# Categorical columns that get one-hot encoded
CATEGORICAL_FEATURES = [
("protocol", "flows"), # TCP / UDP / HTTPS / DNS / SMTP / SSH / FTP / NTP
("flow_lifecycle_phase", "flows"), # initiation / handshake / transfer / ...
("source_device_type", "flows"), # workstation / server / iot / mobile / cloud / ot
("dest_device_type", "flows"),
("segment_type", "topology"), # corporate_lan / dmz / cloud_workload / ...
("firewall_policy", "topology"),
("qos_policy", "topology"),
("defender_architecture","topology"),
]
# ---------------------------------------------------------------------------
# Engineered features
# ---------------------------------------------------------------------------
def _safe_divide(num: pd.Series, denom: pd.Series, fill: float = 0.0) -> pd.Series:
"""Element-wise divide, replacing inf/nan from div-by-zero with `fill`."""
out = num / denom.replace(0, np.nan)
return out.replace([np.inf, -np.inf], np.nan).fillna(fill)
def _add_engineered_features(df: pd.DataFrame) -> pd.DataFrame:
"""
Add eight engineered features that encode domain hypotheses about how
each label class behaves. These are NOT learned; they are stated by hand
so a buyer can read this function and see what the model is told to look
at. Tree models can recover most of these on their own, but giving them
explicitly improves both XGBoost convergence and MLP performance.
"""
df = df.copy()
# IAT coefficient of variation. Low cv => regular inter-arrival times
# => C2 beacon signature (the dataset is calibrated to cv ~= 0.065 for
# APT beacons, regularity score ~= 0.93 per the README).
df["iat_cv"] = _safe_divide(df["inter_arrival_time_std"],
df["inter_arrival_time_mean"])
# Forward/backward byte ratio. >> 1 indicates upload-heavy flow, which
# is the exfiltration signature.
df["fwd_bwd_byte_ratio"] = _safe_divide(df["total_bytes_fwd"],
df["total_bytes_bwd"])
# Bytes per packet (forward direction). Combined with packet length
# std, separates streaming traffic from short-message protocols.
total_fwd = df["total_fwd_packets"].replace(0, np.nan)
df["bytes_per_packet_fwd"] = (df["total_bytes_fwd"] / total_fwd).fillna(0)
# TCP flag anomaly score. RST and URG together, or high counts relative
# to total packets, indicate scan/probe or protocol misuse.
total_packets = (df["total_fwd_packets"] + df["total_bwd_packets"]).replace(0, np.nan)
flag_total = (df["tcp_flag_rst_count"] + df["tcp_flag_urg_count"]
+ df["tcp_flag_fin_count"])
df["tcp_flag_anomaly_score"] = (flag_total / total_packets).fillna(0)
# Payload density. Bytes per packet, normalized to MTU. Low density on
# high packet counts indicates beaconing or keep-alive.
total_bytes = df["total_bytes_fwd"] + df["total_bytes_bwd"]
df["payload_density"] = (total_bytes / (total_packets * 1500)).fillna(0)
# Hour of day from timestamp. Off-hours bias is calibrated into the
# APT and insider-threat tiers.
ts = pd.to_datetime(df["flow_start_timestamp"], errors="coerce")
hour = ts.dt.hour.fillna(12).astype(int)
df["hour_of_day"] = hour
df["is_off_hours"] = ((hour < 6) | (hour > 22)).astype(int)
# Port observables. Well-known ports < 1024, ephemeral ports >= 49152.
df["is_well_known_dest_port"] = (df["dest_port"] < 1024).astype(int)
df["is_ephemeral_src_port"] = (df["source_port"] >= 49152).astype(int)
return df
# ---------------------------------------------------------------------------
# Public API
# ---------------------------------------------------------------------------
def build_features(
flows_path: str | Path,
sessions_path: str | Path,
topology_path: str | Path,
) -> tuple[pd.DataFrame, pd.Series, dict[str, Any]]:
"""
Load the three CSVs, join them, drop leaky columns, engineer features,
one-hot encode categoricals, and return (X, y, meta).
The returned `meta` dict captures the column order and the categorical
level set, which is what `transform_single` needs at inference time to
encode a new record identically.
"""
flows = pd.read_csv(flows_path)
sessions = pd.read_csv(sessions_path)
topology = pd.read_csv(topology_path)
# Drop columns that leak the label (see LEAKY_FLOW_COLUMNS for rationale)
flows = flows.drop(columns=LEAKY_FLOW_COLUMNS, errors="ignore")
# Join session-level aggregates
df = flows.merge(
sessions[["session_id"] + SESSION_FEATURES_KEEP],
on="session_id", how="left",
)
# Join topology features (numeric + categorical)
topo_cols = ["segment_id"] + TOPOLOGY_NUMERIC_FEATURES + [
col for col, src in CATEGORICAL_FEATURES if src == "topology"
]
df = df.merge(topology[topo_cols], on="segment_id", how="left")
# Extract labels before adding features
y = df["label"].map(LABEL_TO_INT).astype(int)
# Engineered features
df = _add_engineered_features(df)
# Assemble feature columns
numeric_features = (
DIRECT_NUMERIC_FLOW_FEATURES
+ SESSION_FEATURES_KEEP
+ TOPOLOGY_NUMERIC_FEATURES
+ [
"iat_cv", "fwd_bwd_byte_ratio", "bytes_per_packet_fwd",
"tcp_flag_anomaly_score", "payload_density",
"hour_of_day", "is_off_hours",
"is_well_known_dest_port", "is_ephemeral_src_port",
]
)
X_numeric = df[numeric_features].astype(float)
# One-hot encode categoricals. Record the level set in `meta` so we can
# reproduce the same columns at inference time even if a new record
# contains an unseen level (it will encode to all-zero, which is the
# correct fallback for one-hot).
categorical_levels: dict[str, list[str]] = {}
one_hot_blocks: list[pd.DataFrame] = []
for col, _src in CATEGORICAL_FEATURES:
levels = sorted(df[col].dropna().unique().tolist())
categorical_levels[col] = levels
block = pd.get_dummies(
df[col].astype("category").cat.set_categories(levels),
prefix=col, dummy_na=False,
).astype(int)
one_hot_blocks.append(block)
X = pd.concat([X_numeric.reset_index(drop=True)]
+ [b.reset_index(drop=True) for b in one_hot_blocks], axis=1)
# Final NaN sweep (defensive - session join can introduce NaN if a
# session_id is missing from session_summary.csv).
X = X.fillna(0.0)
meta = {
"feature_names": X.columns.tolist(),
"numeric_features": numeric_features,
"categorical_levels": categorical_levels,
"label_to_int": LABEL_TO_INT,
"int_to_label": INT_TO_LABEL,
}
return X, y, meta
def transform_single(record: dict | pd.DataFrame, meta: dict[str, Any]) -> np.ndarray:
"""
Encode a single flow record for inference.
`record` must contain the same columns as network_flows.csv (minus the
leaky columns), plus the joined session and topology fields. If you only
have the flow row, you must look up the matching session_summary row and
network_topology row and merge them into `record` before calling this.
Returns a (1, n_features) numpy array ready for model.predict_proba.
"""
if isinstance(record, dict):
df = pd.DataFrame([record])
else:
df = record.copy()
df = _add_engineered_features(df)
# Numeric features in fixed order
numeric = pd.DataFrame({
col: df.get(col, pd.Series([0.0] * len(df))).astype(float).values
for col in meta["numeric_features"]
})
# One-hot blocks in fixed order, using the levels seen at fit time
blocks: list[pd.DataFrame] = [numeric]
for col, levels in meta["categorical_levels"].items():
val = df.get(col, pd.Series([None] * len(df)))
block = pd.get_dummies(
val.astype("category").cat.set_categories(levels),
prefix=col, dummy_na=False,
).astype(int)
# Ensure all expected level columns are present (in case a level
# didn't appear in this single record)
for lvl in levels:
colname = f"{col}_{lvl}"
if colname not in block.columns:
block[colname] = 0
block = block[[f"{col}_{lvl}" for lvl in levels]]
blocks.append(block)
X = pd.concat(blocks, axis=1).fillna(0.0)
# Reorder to match training column order exactly
X = X.reindex(columns=meta["feature_names"], fill_value=0.0)
return X.values.astype(np.float32)
def save_meta(meta: dict[str, Any], path: str | Path) -> None:
"""Persist meta to JSON for inference-time reuse."""
serializable = {
"feature_names": meta["feature_names"],
"numeric_features": meta["numeric_features"],
"categorical_levels": meta["categorical_levels"],
"label_to_int": meta["label_to_int"],
"int_to_label": {str(k): v for k, v in meta["int_to_label"].items()},
}
with open(path, "w") as f:
json.dump(serializable, f, indent=2)
def load_meta(path: str | Path) -> dict[str, Any]:
"""Load meta from JSON."""
with open(path) as f:
meta = json.load(f)
meta["int_to_label"] = {int(k): v for k, v in meta["int_to_label"].items()}
return meta
if __name__ == "__main__":
# Smoke test
import sys
base = Path(sys.argv[1]) if len(sys.argv) > 1 else Path("/mnt/user-data/uploads")
X, y, meta = build_features(
base / "network_flows.csv",
base / "session_summary.csv",
base / "network_topology.csv",
)
print(f"X shape: {X.shape}")
print(f"y shape: {y.shape}")
print(f"n features: {len(meta['feature_names'])}")
print(f"label distribution:\n{y.map(INT_TO_LABEL).value_counts()}")
print(f"X dtypes unique: {X.dtypes.unique()}")
print(f"X has NaN: {X.isnull().any().any()}")