c2-beaconing-detection / features.py
dreyna001's picture
Upload features.py
2ac88da verified
"""Feature engineering for C2 beaconing detection from NetFlow/Palo Alto logs."""
import numpy as np
import pandas as pd
from typing import List, Tuple
# Base NetFlow v3 features available in keys-i/netFlow dataset
BASE_NETFLOW_FEATURES = [
'FLOW_DURATION_MILLISECONDS',
'IN_BYTES', 'IN_PKTS', 'OUT_BYTES', 'OUT_PKTS',
'LONGEST_FLOW_PKT', 'SHORTEST_FLOW_PKT',
'MIN_IP_PKT_LEN', 'MAX_IP_PKT_LEN',
'SRC_TO_DST_AVG_THROUGHPUT', 'DST_TO_SRC_AVG_THROUGHPUT',
'SRC_TO_DST_IAT_MIN', 'SRC_TO_DST_IAT_MAX',
'SRC_TO_DST_IAT_AVG', 'SRC_TO_DST_IAT_STDDEV',
'DST_TO_SRC_IAT_MIN', 'DST_TO_SRC_IAT_MAX',
'DST_TO_SRC_IAT_AVG', 'DST_TO_SRC_IAT_STDDEV',
'NUM_PKTS_UP_TO_128_BYTES', 'NUM_PKTS_128_TO_256_BYTES',
'NUM_PKTS_256_TO_512_BYTES', 'NUM_PKTS_512_TO_1024_BYTES',
'NUM_PKTS_1024_TO_1514_BYTES',
'RETRANSMITTED_IN_BYTES', 'RETRANSMITTED_OUT_BYTES',
'TCP_WIN_MAX_IN', 'TCP_WIN_MAX_OUT',
'MIN_TTL', 'MAX_TTL',
]
# Attack categories that indicate C2-like periodic traffic in CIC-IDS2018
C2_LIKE_ATTACKS = [
'Bot', 'Infiltration', 'Heartbleed',
'FTP-Patator', 'SSH-Patator',
]
def label_is_c2(attack_series: pd.Series) -> pd.Series:
"""Map attack labels to binary C2-like beaconing indicator."""
return attack_series.isin(C2_LIKE_ATTACKS).astype(int)
def engineer_beaconing_features(df: pd.DataFrame) -> pd.DataFrame:
"""Engineer beaconing-specific features from base NetFlow features."""
df = df.copy()
# Avoid division by zero
eps = 1e-9
# ---- Timing / Periodicity Features (Core for beaconing) ----
df['jitter_ratio_src'] = df['SRC_TO_DST_IAT_STDDEV'] / (df['SRC_TO_DST_IAT_AVG'] + eps)
df['jitter_ratio_dst'] = df['DST_TO_SRC_IAT_STDDEV'] / (df['DST_TO_SRC_IAT_AVG'] + eps)
df['jitter_ratio_avg'] = (df['jitter_ratio_src'] + df['jitter_ratio_dst']) / 2
# Coefficient of variation for IAT
df['cv_iat_src'] = df['SRC_TO_DST_IAT_STDDEV'] / (df['SRC_TO_DST_IAT_AVG'] + eps)
df['cv_iat_dst'] = df['DST_TO_SRC_IAT_STDDEV'] / (df['DST_TO_SRC_IAT_AVG'] + eps)
# Flow duration relative to IAT (beacons often have short flows with consistent IAT)
df['duration_to_iat_ratio'] = df['FLOW_DURATION_MILLISECONDS'] / (df['SRC_TO_DST_IAT_AVG'] + eps)
# ---- Size Consistency Features ----
df['pkt_size_range'] = df['MAX_IP_PKT_LEN'] - df['MIN_IP_PKT_LEN']
df['flow_pkt_size_range'] = df['LONGEST_FLOW_PKT'] - df['SHORTEST_FLOW_PKT']
# Bytes per packet ratios
df['bytes_per_pkt_in'] = df['IN_BYTES'] / (df['IN_PKTS'] + eps)
df['bytes_per_pkt_out'] = df['OUT_BYTES'] / (df['OUT_PKTS'] + eps)
df['bytes_per_pkt_ratio'] = df['bytes_per_pkt_in'] / (df['bytes_per_pkt_out'] + eps)
# Packet size consistency score (low = consistent = beacon-like)
df['pkt_size_consistency'] = df['pkt_size_range'] / (df['MAX_IP_PKT_LEN'] + eps)
# ---- Volume / Rate Features ----
df['total_bytes'] = df['IN_BYTES'] + df['OUT_BYTES']
df['total_pkts'] = df['IN_PKTS'] + df['OUT_PKTS']
df['byte_ratio'] = df['OUT_BYTES'] / (df['IN_BYTES'] + eps)
df['pkt_ratio'] = df['OUT_PKTS'] / (df['IN_PKTS'] + eps)
# Throughput ratios
df['throughput_ratio'] = df['SRC_TO_DST_AVG_THROUGHPUT'] / (df['DST_TO_SRC_AVG_THROUGHPUT'] + eps)
# Flow bytes per millisecond
df['bytes_per_ms'] = df['total_bytes'] / (df['FLOW_DURATION_MILLISECONDS'] + eps)
df['pkts_per_ms'] = df['total_pkts'] / (df['FLOW_DURATION_MILLISECONDS'] + eps)
# ---- Packet Distribution Features ----
total_pkts = df[['NUM_PKTS_UP_TO_128_BYTES', 'NUM_PKTS_128_TO_256_BYTES',
'NUM_PKTS_256_TO_512_BYTES', 'NUM_PKTS_512_TO_1024_BYTES',
'NUM_PKTS_1024_TO_1514_BYTES']].sum(axis=1) + eps
df['small_pkt_ratio'] = df['NUM_PKTS_UP_TO_128_BYTES'] / total_pkts
df['medium_pkt_ratio'] = (df['NUM_PKTS_128_TO_256_BYTES'] + df['NUM_PKTS_256_TO_512_BYTES']) / total_pkts
df['large_pkt_ratio'] = (df['NUM_PKTS_512_TO_1024_BYTES'] + df['NUM_PKTS_1024_TO_1514_BYTES']) / total_pkts
# Entropy-like measure of packet size distribution (low = focused = beacon-like)
pkt_dist = df[['NUM_PKTS_UP_TO_128_BYTES', 'NUM_PKTS_128_TO_256_BYTES',
'NUM_PKTS_256_TO_512_BYTES', 'NUM_PKTS_512_TO_1024_BYTES',
'NUM_PKTS_1024_TO_1514_BYTES']].div(total_pkts, axis=0)
# Shannon entropy of packet size distribution
df['pkt_size_entropy'] = -(pkt_dist * np.log2(pkt_dist + eps)).sum(axis=1)
# ---- Retransmission / Error Features ----
df['retrans_ratio'] = (df['RETRANSMITTED_IN_BYTES'] + df['RETRANSMITTED_OUT_BYTES']) / (df['total_bytes'] + eps)
# ---- TCP Features ----
df['tcp_win_ratio'] = df['TCP_WIN_MAX_OUT'] / (df['TCP_WIN_MAX_IN'] + eps)
# ---- TTL Features ----
df['ttl_range'] = df['MAX_TTL'] - df['MIN_TTL']
# ---- Beaconing Composite Score ----
# Low jitter + consistent packet sizes + low volume = high beacon score
df['beacon_score'] = (
(1.0 / (1.0 + df['jitter_ratio_avg'])) * 0.4 +
(1.0 / (1.0 + df['pkt_size_consistency'])) * 0.3 +
(1.0 / (1.0 + df['total_bytes'] / 1000.0)) * 0.3
)
# Clip infinite values
df = df.replace([np.inf, -np.inf], np.nan).fillna(0)
return df
def get_feature_columns() -> List[str]:
"""Return the full list of feature columns for model training."""
base = BASE_NETFLOW_FEATURES.copy()
engineered = [
'jitter_ratio_src', 'jitter_ratio_dst', 'jitter_ratio_avg',
'cv_iat_src', 'cv_iat_dst',
'duration_to_iat_ratio',
'pkt_size_range', 'flow_pkt_size_range',
'bytes_per_pkt_in', 'bytes_per_pkt_out', 'bytes_per_pkt_ratio',
'pkt_size_consistency',
'total_bytes', 'total_pkts', 'byte_ratio', 'pkt_ratio',
'throughput_ratio', 'bytes_per_ms', 'pkts_per_ms',
'small_pkt_ratio', 'medium_pkt_ratio', 'large_pkt_ratio',
'pkt_size_entropy', 'retrans_ratio',
'tcp_win_ratio', 'ttl_range',
'beacon_score',
]
return base + engineered
def prepare_training_data(df: pd.DataFrame) -> Tuple[pd.DataFrame, pd.Series]:
"""Prepare features and labels from raw NetFlow dataframe."""
df = engineer_beaconing_features(df)
feature_cols = get_feature_columns()
# Ensure all feature columns exist
for col in feature_cols:
if col not in df.columns:
df[col] = 0
X = df[feature_cols]
y = label_is_c2(df['Attack']) if 'Attack' in df.columns else df.get('Label', pd.Series(0, index=df.index))
return X, y