"""Feature engineering for C2 beaconing detection from NetFlow/Palo Alto logs.""" import numpy as np import pandas as pd from typing import List, Tuple # Base NetFlow v3 features available in keys-i/netFlow dataset BASE_NETFLOW_FEATURES = [ 'FLOW_DURATION_MILLISECONDS', 'IN_BYTES', 'IN_PKTS', 'OUT_BYTES', 'OUT_PKTS', 'LONGEST_FLOW_PKT', 'SHORTEST_FLOW_PKT', 'MIN_IP_PKT_LEN', 'MAX_IP_PKT_LEN', 'SRC_TO_DST_AVG_THROUGHPUT', 'DST_TO_SRC_AVG_THROUGHPUT', 'SRC_TO_DST_IAT_MIN', 'SRC_TO_DST_IAT_MAX', 'SRC_TO_DST_IAT_AVG', 'SRC_TO_DST_IAT_STDDEV', 'DST_TO_SRC_IAT_MIN', 'DST_TO_SRC_IAT_MAX', 'DST_TO_SRC_IAT_AVG', 'DST_TO_SRC_IAT_STDDEV', 'NUM_PKTS_UP_TO_128_BYTES', 'NUM_PKTS_128_TO_256_BYTES', 'NUM_PKTS_256_TO_512_BYTES', 'NUM_PKTS_512_TO_1024_BYTES', 'NUM_PKTS_1024_TO_1514_BYTES', 'RETRANSMITTED_IN_BYTES', 'RETRANSMITTED_OUT_BYTES', 'TCP_WIN_MAX_IN', 'TCP_WIN_MAX_OUT', 'MIN_TTL', 'MAX_TTL', ] # Attack categories that indicate C2-like periodic traffic in CIC-IDS2018 C2_LIKE_ATTACKS = [ 'Bot', 'Infiltration', 'Heartbleed', 'FTP-Patator', 'SSH-Patator', ] def label_is_c2(attack_series: pd.Series) -> pd.Series: """Map attack labels to binary C2-like beaconing indicator.""" return attack_series.isin(C2_LIKE_ATTACKS).astype(int) def engineer_beaconing_features(df: pd.DataFrame) -> pd.DataFrame: """Engineer beaconing-specific features from base NetFlow features.""" df = df.copy() # Avoid division by zero eps = 1e-9 # ---- Timing / Periodicity Features (Core for beaconing) ---- df['jitter_ratio_src'] = df['SRC_TO_DST_IAT_STDDEV'] / (df['SRC_TO_DST_IAT_AVG'] + eps) df['jitter_ratio_dst'] = df['DST_TO_SRC_IAT_STDDEV'] / (df['DST_TO_SRC_IAT_AVG'] + eps) df['jitter_ratio_avg'] = (df['jitter_ratio_src'] + df['jitter_ratio_dst']) / 2 # Coefficient of variation for IAT df['cv_iat_src'] = df['SRC_TO_DST_IAT_STDDEV'] / (df['SRC_TO_DST_IAT_AVG'] + eps) df['cv_iat_dst'] = df['DST_TO_SRC_IAT_STDDEV'] / (df['DST_TO_SRC_IAT_AVG'] + eps) # Flow duration relative to IAT (beacons often have short flows with consistent IAT) df['duration_to_iat_ratio'] = df['FLOW_DURATION_MILLISECONDS'] / (df['SRC_TO_DST_IAT_AVG'] + eps) # ---- Size Consistency Features ---- df['pkt_size_range'] = df['MAX_IP_PKT_LEN'] - df['MIN_IP_PKT_LEN'] df['flow_pkt_size_range'] = df['LONGEST_FLOW_PKT'] - df['SHORTEST_FLOW_PKT'] # Bytes per packet ratios df['bytes_per_pkt_in'] = df['IN_BYTES'] / (df['IN_PKTS'] + eps) df['bytes_per_pkt_out'] = df['OUT_BYTES'] / (df['OUT_PKTS'] + eps) df['bytes_per_pkt_ratio'] = df['bytes_per_pkt_in'] / (df['bytes_per_pkt_out'] + eps) # Packet size consistency score (low = consistent = beacon-like) df['pkt_size_consistency'] = df['pkt_size_range'] / (df['MAX_IP_PKT_LEN'] + eps) # ---- Volume / Rate Features ---- df['total_bytes'] = df['IN_BYTES'] + df['OUT_BYTES'] df['total_pkts'] = df['IN_PKTS'] + df['OUT_PKTS'] df['byte_ratio'] = df['OUT_BYTES'] / (df['IN_BYTES'] + eps) df['pkt_ratio'] = df['OUT_PKTS'] / (df['IN_PKTS'] + eps) # Throughput ratios df['throughput_ratio'] = df['SRC_TO_DST_AVG_THROUGHPUT'] / (df['DST_TO_SRC_AVG_THROUGHPUT'] + eps) # Flow bytes per millisecond df['bytes_per_ms'] = df['total_bytes'] / (df['FLOW_DURATION_MILLISECONDS'] + eps) df['pkts_per_ms'] = df['total_pkts'] / (df['FLOW_DURATION_MILLISECONDS'] + eps) # ---- Packet Distribution Features ---- total_pkts = df[['NUM_PKTS_UP_TO_128_BYTES', 'NUM_PKTS_128_TO_256_BYTES', 'NUM_PKTS_256_TO_512_BYTES', 'NUM_PKTS_512_TO_1024_BYTES', 'NUM_PKTS_1024_TO_1514_BYTES']].sum(axis=1) + eps df['small_pkt_ratio'] = df['NUM_PKTS_UP_TO_128_BYTES'] / total_pkts df['medium_pkt_ratio'] = (df['NUM_PKTS_128_TO_256_BYTES'] + df['NUM_PKTS_256_TO_512_BYTES']) / total_pkts df['large_pkt_ratio'] = (df['NUM_PKTS_512_TO_1024_BYTES'] + df['NUM_PKTS_1024_TO_1514_BYTES']) / total_pkts # Entropy-like measure of packet size distribution (low = focused = beacon-like) pkt_dist = df[['NUM_PKTS_UP_TO_128_BYTES', 'NUM_PKTS_128_TO_256_BYTES', 'NUM_PKTS_256_TO_512_BYTES', 'NUM_PKTS_512_TO_1024_BYTES', 'NUM_PKTS_1024_TO_1514_BYTES']].div(total_pkts, axis=0) # Shannon entropy of packet size distribution df['pkt_size_entropy'] = -(pkt_dist * np.log2(pkt_dist + eps)).sum(axis=1) # ---- Retransmission / Error Features ---- df['retrans_ratio'] = (df['RETRANSMITTED_IN_BYTES'] + df['RETRANSMITTED_OUT_BYTES']) / (df['total_bytes'] + eps) # ---- TCP Features ---- df['tcp_win_ratio'] = df['TCP_WIN_MAX_OUT'] / (df['TCP_WIN_MAX_IN'] + eps) # ---- TTL Features ---- df['ttl_range'] = df['MAX_TTL'] - df['MIN_TTL'] # ---- Beaconing Composite Score ---- # Low jitter + consistent packet sizes + low volume = high beacon score df['beacon_score'] = ( (1.0 / (1.0 + df['jitter_ratio_avg'])) * 0.4 + (1.0 / (1.0 + df['pkt_size_consistency'])) * 0.3 + (1.0 / (1.0 + df['total_bytes'] / 1000.0)) * 0.3 ) # Clip infinite values df = df.replace([np.inf, -np.inf], np.nan).fillna(0) return df def get_feature_columns() -> List[str]: """Return the full list of feature columns for model training.""" base = BASE_NETFLOW_FEATURES.copy() engineered = [ 'jitter_ratio_src', 'jitter_ratio_dst', 'jitter_ratio_avg', 'cv_iat_src', 'cv_iat_dst', 'duration_to_iat_ratio', 'pkt_size_range', 'flow_pkt_size_range', 'bytes_per_pkt_in', 'bytes_per_pkt_out', 'bytes_per_pkt_ratio', 'pkt_size_consistency', 'total_bytes', 'total_pkts', 'byte_ratio', 'pkt_ratio', 'throughput_ratio', 'bytes_per_ms', 'pkts_per_ms', 'small_pkt_ratio', 'medium_pkt_ratio', 'large_pkt_ratio', 'pkt_size_entropy', 'retrans_ratio', 'tcp_win_ratio', 'ttl_range', 'beacon_score', ] return base + engineered def prepare_training_data(df: pd.DataFrame) -> Tuple[pd.DataFrame, pd.Series]: """Prepare features and labels from raw NetFlow dataframe.""" df = engineer_beaconing_features(df) feature_cols = get_feature_columns() # Ensure all feature columns exist for col in feature_cols: if col not in df.columns: df[col] = 0 X = df[feature_cols] y = label_is_c2(df['Attack']) if 'Attack' in df.columns else df.get('Label', pd.Series(0, index=df.index)) return X, y