| """Feature engineering for C2 beaconing detection from NetFlow/Palo Alto logs.""" |
| import numpy as np |
| import pandas as pd |
| from typing import List, Tuple |
|
|
| |
| BASE_NETFLOW_FEATURES = [ |
| 'FLOW_DURATION_MILLISECONDS', |
| 'IN_BYTES', 'IN_PKTS', 'OUT_BYTES', 'OUT_PKTS', |
| 'LONGEST_FLOW_PKT', 'SHORTEST_FLOW_PKT', |
| 'MIN_IP_PKT_LEN', 'MAX_IP_PKT_LEN', |
| 'SRC_TO_DST_AVG_THROUGHPUT', 'DST_TO_SRC_AVG_THROUGHPUT', |
| 'SRC_TO_DST_IAT_MIN', 'SRC_TO_DST_IAT_MAX', |
| 'SRC_TO_DST_IAT_AVG', 'SRC_TO_DST_IAT_STDDEV', |
| 'DST_TO_SRC_IAT_MIN', 'DST_TO_SRC_IAT_MAX', |
| 'DST_TO_SRC_IAT_AVG', 'DST_TO_SRC_IAT_STDDEV', |
| 'NUM_PKTS_UP_TO_128_BYTES', 'NUM_PKTS_128_TO_256_BYTES', |
| 'NUM_PKTS_256_TO_512_BYTES', 'NUM_PKTS_512_TO_1024_BYTES', |
| 'NUM_PKTS_1024_TO_1514_BYTES', |
| 'RETRANSMITTED_IN_BYTES', 'RETRANSMITTED_OUT_BYTES', |
| 'TCP_WIN_MAX_IN', 'TCP_WIN_MAX_OUT', |
| 'MIN_TTL', 'MAX_TTL', |
| ] |
|
|
| |
| C2_LIKE_ATTACKS = [ |
| 'Bot', 'Infiltration', 'Heartbleed', |
| 'FTP-Patator', 'SSH-Patator', |
| ] |
|
|
|
|
| def label_is_c2(attack_series: pd.Series) -> pd.Series: |
| """Map attack labels to binary C2-like beaconing indicator.""" |
| return attack_series.isin(C2_LIKE_ATTACKS).astype(int) |
|
|
|
|
| def engineer_beaconing_features(df: pd.DataFrame) -> pd.DataFrame: |
| """Engineer beaconing-specific features from base NetFlow features.""" |
| df = df.copy() |
| |
| |
| eps = 1e-9 |
| |
| |
| df['jitter_ratio_src'] = df['SRC_TO_DST_IAT_STDDEV'] / (df['SRC_TO_DST_IAT_AVG'] + eps) |
| df['jitter_ratio_dst'] = df['DST_TO_SRC_IAT_STDDEV'] / (df['DST_TO_SRC_IAT_AVG'] + eps) |
| df['jitter_ratio_avg'] = (df['jitter_ratio_src'] + df['jitter_ratio_dst']) / 2 |
| |
| |
| df['cv_iat_src'] = df['SRC_TO_DST_IAT_STDDEV'] / (df['SRC_TO_DST_IAT_AVG'] + eps) |
| df['cv_iat_dst'] = df['DST_TO_SRC_IAT_STDDEV'] / (df['DST_TO_SRC_IAT_AVG'] + eps) |
| |
| |
| df['duration_to_iat_ratio'] = df['FLOW_DURATION_MILLISECONDS'] / (df['SRC_TO_DST_IAT_AVG'] + eps) |
| |
| |
| df['pkt_size_range'] = df['MAX_IP_PKT_LEN'] - df['MIN_IP_PKT_LEN'] |
| df['flow_pkt_size_range'] = df['LONGEST_FLOW_PKT'] - df['SHORTEST_FLOW_PKT'] |
| |
| |
| df['bytes_per_pkt_in'] = df['IN_BYTES'] / (df['IN_PKTS'] + eps) |
| df['bytes_per_pkt_out'] = df['OUT_BYTES'] / (df['OUT_PKTS'] + eps) |
| df['bytes_per_pkt_ratio'] = df['bytes_per_pkt_in'] / (df['bytes_per_pkt_out'] + eps) |
| |
| |
| df['pkt_size_consistency'] = df['pkt_size_range'] / (df['MAX_IP_PKT_LEN'] + eps) |
| |
| |
| df['total_bytes'] = df['IN_BYTES'] + df['OUT_BYTES'] |
| df['total_pkts'] = df['IN_PKTS'] + df['OUT_PKTS'] |
| df['byte_ratio'] = df['OUT_BYTES'] / (df['IN_BYTES'] + eps) |
| df['pkt_ratio'] = df['OUT_PKTS'] / (df['IN_PKTS'] + eps) |
| |
| |
| df['throughput_ratio'] = df['SRC_TO_DST_AVG_THROUGHPUT'] / (df['DST_TO_SRC_AVG_THROUGHPUT'] + eps) |
| |
| |
| df['bytes_per_ms'] = df['total_bytes'] / (df['FLOW_DURATION_MILLISECONDS'] + eps) |
| df['pkts_per_ms'] = df['total_pkts'] / (df['FLOW_DURATION_MILLISECONDS'] + eps) |
| |
| |
| total_pkts = df[['NUM_PKTS_UP_TO_128_BYTES', 'NUM_PKTS_128_TO_256_BYTES', |
| 'NUM_PKTS_256_TO_512_BYTES', 'NUM_PKTS_512_TO_1024_BYTES', |
| 'NUM_PKTS_1024_TO_1514_BYTES']].sum(axis=1) + eps |
| |
| df['small_pkt_ratio'] = df['NUM_PKTS_UP_TO_128_BYTES'] / total_pkts |
| df['medium_pkt_ratio'] = (df['NUM_PKTS_128_TO_256_BYTES'] + df['NUM_PKTS_256_TO_512_BYTES']) / total_pkts |
| df['large_pkt_ratio'] = (df['NUM_PKTS_512_TO_1024_BYTES'] + df['NUM_PKTS_1024_TO_1514_BYTES']) / total_pkts |
| |
| |
| pkt_dist = df[['NUM_PKTS_UP_TO_128_BYTES', 'NUM_PKTS_128_TO_256_BYTES', |
| 'NUM_PKTS_256_TO_512_BYTES', 'NUM_PKTS_512_TO_1024_BYTES', |
| 'NUM_PKTS_1024_TO_1514_BYTES']].div(total_pkts, axis=0) |
| |
| df['pkt_size_entropy'] = -(pkt_dist * np.log2(pkt_dist + eps)).sum(axis=1) |
| |
| |
| df['retrans_ratio'] = (df['RETRANSMITTED_IN_BYTES'] + df['RETRANSMITTED_OUT_BYTES']) / (df['total_bytes'] + eps) |
| |
| |
| df['tcp_win_ratio'] = df['TCP_WIN_MAX_OUT'] / (df['TCP_WIN_MAX_IN'] + eps) |
| |
| |
| df['ttl_range'] = df['MAX_TTL'] - df['MIN_TTL'] |
| |
| |
| |
| df['beacon_score'] = ( |
| (1.0 / (1.0 + df['jitter_ratio_avg'])) * 0.4 + |
| (1.0 / (1.0 + df['pkt_size_consistency'])) * 0.3 + |
| (1.0 / (1.0 + df['total_bytes'] / 1000.0)) * 0.3 |
| ) |
| |
| |
| df = df.replace([np.inf, -np.inf], np.nan).fillna(0) |
| |
| return df |
|
|
|
|
| def get_feature_columns() -> List[str]: |
| """Return the full list of feature columns for model training.""" |
| base = BASE_NETFLOW_FEATURES.copy() |
| engineered = [ |
| 'jitter_ratio_src', 'jitter_ratio_dst', 'jitter_ratio_avg', |
| 'cv_iat_src', 'cv_iat_dst', |
| 'duration_to_iat_ratio', |
| 'pkt_size_range', 'flow_pkt_size_range', |
| 'bytes_per_pkt_in', 'bytes_per_pkt_out', 'bytes_per_pkt_ratio', |
| 'pkt_size_consistency', |
| 'total_bytes', 'total_pkts', 'byte_ratio', 'pkt_ratio', |
| 'throughput_ratio', 'bytes_per_ms', 'pkts_per_ms', |
| 'small_pkt_ratio', 'medium_pkt_ratio', 'large_pkt_ratio', |
| 'pkt_size_entropy', 'retrans_ratio', |
| 'tcp_win_ratio', 'ttl_range', |
| 'beacon_score', |
| ] |
| return base + engineered |
|
|
|
|
| def prepare_training_data(df: pd.DataFrame) -> Tuple[pd.DataFrame, pd.Series]: |
| """Prepare features and labels from raw NetFlow dataframe.""" |
| df = engineer_beaconing_features(df) |
| feature_cols = get_feature_columns() |
| |
| for col in feature_cols: |
| if col not in df.columns: |
| df[col] = 0 |
| X = df[feature_cols] |
| y = label_is_c2(df['Attack']) if 'Attack' in df.columns else df.get('Label', pd.Series(0, index=df.index)) |
| return X, y |
|
|