cyb006-baseline-classifier / feature_engineering.py
pradeep-xpert's picture
Initial release: XGBoost + MLP for user-risk-tier classification, plus structural-leakage diagnostic on threat-actor detection
e6a6835 verified
"""
feature_engineering.py
======================
Feature pipeline for the CYB006 baseline classifier.
Predicts `user_risk_tier` (3-class: low / medium / high) from per-user
identity aggregates on the CYB006 sample dataset.
CSV inputs:
user_risk_summary.csv (primary, per-user aggregates, 200 rows)
login_sessions.csv (per-session telemetry, joined as
per-user behavioural aggregates)
identity_topology.csv (identity domain registry; reserved for
future work - no direct user join key)
auth_events.csv (discrete event log; reserved for
future work)
Target classes (3):
low, medium, high
Why this task instead of threat_actor_capability_tier
-----------------------------------------------------
The CYB006 README lists "threat-actor tier classification (4-class)" as
its primary suggested use case. We piloted that target first and found
the sample dataset has STRUCTURAL DETERMINISM: every actor-tier signal
in the data (velocity_anomaly_score, session_timestamp, credential
attempt count, login outcome, geo country code, device trust level,
user risk tier itself, geo anomaly score) carries non-overlapping
distributions between threat and legitimate sessions. As a result, a
plain XGBoost achieves 100% test accuracy on threat-actor binary
classification across every random seed - and stays at 97-100%
accuracy even with all six oracle feature groups removed.
This is not a methodological failure; it's a property of how the
sample was generated. Real-world identity telemetry has substantial
overlap between threat-actor and legitimate behaviour. The model card
documents this as a diagnostic finding for the dataset author and a
caveat for buyers planning to train detection models on the sample.
For a working baseline that demonstrates honest ML on the dataset, we
shifted to predicting `user_risk_tier` from per-user aggregates. This
task has overlapping per-tier feature distributions, no oracle features,
and lifts modestly over majority baseline (acc 0.66 vs 0.57 majority).
Public API
----------
build_features(user_risk_path, sessions_path) -> (X, y, ids, meta)
transform_single(record, meta) -> np.ndarray
save_meta(meta, path) / load_meta(path)
License
-------
Ships with the public model on Hugging Face under CC-BY-NC-4.0,
matching the dataset license. See README.md.
"""
from __future__ import annotations
import json
from pathlib import Path
from typing import Any
import numpy as np
import pandas as pd
# ---------------------------------------------------------------------------
# Label space
# ---------------------------------------------------------------------------
# Ordered low -> high. Note: CYB006 README claims a 4th tier 'critical' but
# the sample data contains only 3 (low, medium, high).
LABEL_ORDER = ["low", "medium", "high"]
LABEL_TO_INT = {lbl: i for i, lbl in enumerate(LABEL_ORDER)}
INT_TO_LABEL = {i: lbl for lbl, i in LABEL_TO_INT.items()}
# ---------------------------------------------------------------------------
# Identifier and target columns
# ---------------------------------------------------------------------------
ID_COLUMNS = ["user_id"]
TARGET_COLUMN = "user_risk_tier"
# ---------------------------------------------------------------------------
# Per-user numeric features from user_risk_summary.csv
# ---------------------------------------------------------------------------
# These are aggregate counts and continuous scores. They carry overlapping
# distributions across tiers - not oracles.
USER_NUMERIC_FEATURES = [
"total_login_attempts",
"successful_logins",
"failed_logins",
"mfa_failures",
"impossible_travel_events",
"lateral_hop_count",
"privilege_escalations",
"account_lockout_count",
"geo_dispersion_score",
"login_velocity_score",
"session_anomaly_rate",
"ueba_alert_count",
"overall_identity_risk_score",
"insider_threat_indicator_score",
]
USER_CATEGORICAL_FEATURES = [
"peak_privilege_level_accessed", # 6 values
]
# Note: we intentionally exclude `threat_actor_flag`, `account_takeover_flag`,
# and `credential_attack_victim_flag` from user_risk_summary as features.
# threat_actor_flag is a perfect oracle for whether tier=high (only high-tier
# users can be flagged threat actors). account_takeover and credential_attack
# are extremely rare (2/200 and 1/200) - not useful as features in the
# sample, and using them risks the same kind of structural leakage we
# documented for threat-actor classification.
USER_LEAKY_COLUMNS = [
"threat_actor_flag",
"account_takeover_flag",
"credential_attack_victim_flag",
]
# ---------------------------------------------------------------------------
# Per-session aggregates joined into the user-level row
# ---------------------------------------------------------------------------
# We compute these from login_sessions.csv aggregated by user_id. They add
# behavioural color (avg session duration, fraction of sessions with
# impossible travel, etc.) without introducing leakage. We explicitly
# exclude session-level columns that exhibit non-overlap with threat actors
# (velocity_anomaly_score, session_timestamp_utc, credential_attempt_count,
# login_outcome) because those features create degenerate signal even when
# aggregated, and would compromise the user_risk_tier evaluation by
# enabling shortcuts via the threat_actor_flag-correlated structure.
SESSION_AGGS_NUMERIC = [
"avg_session_duration_seconds",
"avg_mfa_response_latency_ms",
"avg_geo_anomaly_score",
"max_geo_anomaly_score",
"frac_impossible_travel",
"n_unique_countries",
"n_unique_devices",
"n_unique_applications",
]
def _aggregate_sessions(sessions: pd.DataFrame) -> pd.DataFrame:
"""Compute per-user session aggregates without using leaky features."""
g = sessions.groupby("user_id")
aggs = pd.DataFrame({
"avg_session_duration_seconds": g["session_duration_seconds"].mean(),
"avg_mfa_response_latency_ms": g["mfa_response_latency_ms"].mean(),
"avg_geo_anomaly_score": g["geo_anomaly_score"].mean(),
"max_geo_anomaly_score": g["geo_anomaly_score"].max(),
"frac_impossible_travel": g["impossible_travel_flag"].mean(),
"n_unique_countries": g["geo_country_code"].nunique(),
"n_unique_devices": g["device_id_hash"].nunique(),
"n_unique_applications": g["target_application_id"].nunique(),
}).reset_index()
return aggs
# ---------------------------------------------------------------------------
# Engineered features
# ---------------------------------------------------------------------------
def _add_engineered_features(df: pd.DataFrame) -> pd.DataFrame:
"""
Six engineered features that combine the raw aggregates into
risk-discriminative composites. None encode the target directly.
"""
df = df.copy()
# 1. Failed-login fraction. Common signal across all risk tiers but
# high-tier users have systematically more failures.
denom = df["total_login_attempts"].clip(lower=1)
df["failed_login_rate"] = (df["failed_logins"] / denom).astype(float)
# 2. MFA failure rate per login.
df["mfa_failure_rate"] = (df["mfa_failures"] / denom).astype(float)
# 3. UEBA alerts per session - normalizes alert count to session volume.
sess_denom = df["successful_logins"].clip(lower=1)
df["ueba_alerts_per_session"] = (df["ueba_alert_count"] / sess_denom).astype(float)
# 4. Lateral movement intensity (hops per privilege escalation).
pe_denom = df["privilege_escalations"].clip(lower=1)
df["hops_per_escalation"] = (df["lateral_hop_count"] / pe_denom).astype(float)
# 5. Geo-velocity composite: dispersion x velocity score (continuous).
df["geo_velocity_composite"] = (
df["geo_dispersion_score"] * df["login_velocity_score"]
).astype(float)
# 6. Composite identity-anomaly score: average of risk + insider scores.
df["composite_anomaly_score"] = (
(df["overall_identity_risk_score"] + df["insider_threat_indicator_score"]) / 2.0
).astype(float)
return df
# ---------------------------------------------------------------------------
# Public API
# ---------------------------------------------------------------------------
def build_features(
user_risk_path: str | Path,
sessions_path: str | Path,
) -> tuple[pd.DataFrame, pd.Series, pd.Series, dict[str, Any]]:
"""
Load user_risk_summary, join non-leaky session aggregates, engineer
features, one-hot encode, return (X, y, ids, meta).
`ids` is a Series of user_id values aligned with X (used for
deterministic predictions / round-tripping; not a group label since
this task is user-level, not session-level).
"""
users = pd.read_csv(user_risk_path)
sessions = pd.read_csv(sessions_path)
y = users[TARGET_COLUMN].map(LABEL_TO_INT)
if y.isna().any():
bad = users.loc[y.isna(), TARGET_COLUMN].unique()
raise ValueError(f"Unknown user_risk_tier values: {bad}")
y = y.astype(int)
ids = users["user_id"].copy()
users = users.drop(
columns=ID_COLUMNS + [TARGET_COLUMN] + USER_LEAKY_COLUMNS,
errors="ignore",
)
session_aggs = _aggregate_sessions(sessions)
users["__user_id__"] = ids
users = users.merge(
session_aggs.rename(columns={"user_id": "__user_id__"}),
on="__user_id__", how="left",
).drop(columns=["__user_id__"])
users = _add_engineered_features(users)
numeric_features = (
USER_NUMERIC_FEATURES
+ SESSION_AGGS_NUMERIC
+ [
"failed_login_rate", "mfa_failure_rate", "ueba_alerts_per_session",
"hops_per_escalation", "geo_velocity_composite", "composite_anomaly_score",
]
)
numeric_features = [c for c in numeric_features if c in users.columns]
X_numeric = users[numeric_features].astype(float)
categorical_levels: dict[str, list[str]] = {}
blocks: list[pd.DataFrame] = []
for col in USER_CATEGORICAL_FEATURES:
if col not in users.columns:
continue
levels = sorted(users[col].dropna().unique().tolist())
categorical_levels[col] = levels
block = pd.get_dummies(
users[col].astype("category").cat.set_categories(levels),
prefix=col, dummy_na=False,
).astype(int)
blocks.append(block)
X = pd.concat(
[X_numeric.reset_index(drop=True)]
+ [b.reset_index(drop=True) for b in blocks],
axis=1,
).fillna(0.0)
meta = {
"feature_names": X.columns.tolist(),
"numeric_features": numeric_features,
"categorical_levels": categorical_levels,
"label_to_int": LABEL_TO_INT,
"int_to_label": INT_TO_LABEL,
"user_leaky_excluded": USER_LEAKY_COLUMNS,
}
return X, y, ids, meta
def transform_single(
record: dict | pd.DataFrame,
meta: dict[str, Any],
) -> np.ndarray:
"""Encode a single per-user record for inference.
Caller is responsible for computing session aggregates (the
SESSION_AGGS_NUMERIC fields) and passing them in record. See the
inference notebook for the standard pattern.
"""
if isinstance(record, dict):
df = pd.DataFrame([record.copy()])
else:
df = record.copy()
df = _add_engineered_features(df)
numeric = pd.DataFrame({
col: df.get(col, pd.Series([0.0] * len(df))).astype(float).values
for col in meta["numeric_features"]
})
blocks: list[pd.DataFrame] = [numeric]
for col, levels in meta["categorical_levels"].items():
val = df.get(col, pd.Series([None] * len(df)))
block = pd.get_dummies(
val.astype("category").cat.set_categories(levels),
prefix=col, dummy_na=False,
).astype(int)
for lvl in levels:
cname = f"{col}_{lvl}"
if cname not in block.columns:
block[cname] = 0
block = block[[f"{col}_{lvl}" for lvl in levels]]
blocks.append(block)
X = pd.concat(blocks, axis=1).fillna(0.0)
X = X.reindex(columns=meta["feature_names"], fill_value=0.0)
return X.values.astype(np.float32)
def save_meta(meta: dict[str, Any], path: str | Path) -> None:
serializable = {
"feature_names": meta["feature_names"],
"numeric_features": meta["numeric_features"],
"categorical_levels": meta["categorical_levels"],
"label_to_int": meta["label_to_int"],
"int_to_label": {str(k): v for k, v in meta["int_to_label"].items()},
"user_leaky_excluded": meta.get("user_leaky_excluded", []),
}
with open(path, "w") as f:
json.dump(serializable, f, indent=2)
def load_meta(path: str | Path) -> dict[str, Any]:
with open(path) as f:
meta = json.load(f)
meta["int_to_label"] = {int(k): v for k, v in meta["int_to_label"].items()}
return meta
def compute_session_aggregates_for_user(
user_sessions: pd.DataFrame,
) -> dict:
"""Compute session aggregates for a single user (used at inference)."""
aggs = {
"avg_session_duration_seconds": float(user_sessions["session_duration_seconds"].mean()),
"avg_mfa_response_latency_ms": float(user_sessions["mfa_response_latency_ms"].mean()),
"avg_geo_anomaly_score": float(user_sessions["geo_anomaly_score"].mean()),
"max_geo_anomaly_score": float(user_sessions["geo_anomaly_score"].max()),
"frac_impossible_travel": float(user_sessions["impossible_travel_flag"].mean()),
"n_unique_countries": int(user_sessions["geo_country_code"].nunique()),
"n_unique_devices": int(user_sessions["device_id_hash"].nunique()),
"n_unique_applications": int(user_sessions["target_application_id"].nunique()),
}
return aggs
if __name__ == "__main__":
import sys
base = Path(sys.argv[1]) if len(sys.argv) > 1 else Path("/mnt/user-data/uploads")
X, y, ids, meta = build_features(
base / "user_risk_summary.csv",
base / "login_sessions.csv",
)
print(f"X shape: {X.shape}")
print(f"y shape: {y.shape}")
print(f"n_features: {len(meta['feature_names'])}")
print(f"label distribution:\n{y.map(INT_TO_LABEL).value_counts()}")
print(f"X has NaN: {X.isnull().any().any()}")