| from __future__ import annotations |
|
|
| import numpy as np |
| import pandas as pd |
|
|
| |
| _ALLOWED_EDGE_COLS = frozenset({ |
| "sender_id", "receiver_id", "timestamp", "amount", |
| "fail_prob", "failed", "is_retry", "neighbor_score", |
| "risk_score", "txn_type", "pair_freq", "risk_noisy", |
| "txn_count_10", "amount_sum_10", "is_fraud", |
| }) |
| _BLOCKED_COLS = frozenset({ |
| "motif_hit_count", "motif_source", "trigger_event_idx", "label_event_idx", |
| "label_delay", "is_fallback_label", "fraud_source", |
| "twin_role", "twin_label", "twin_pair_id", "template_id", |
| "dynamic_fraud_state", "motif_chain_state", "motif_strength", |
| }) |
|
|
|
|
| def build_edge_index(df: pd.DataFrame): |
| src = df["sender_id"].values.astype(np.int64) |
| dst = df["receiver_id"].values.astype(np.int64) |
|
|
| edge_index = np.vstack([src, dst]) |
| return edge_index |
|
|
|
|
| def build_edge_features(df: pd.DataFrame): |
| leaked = _BLOCKED_COLS & set(df.columns) |
| assert not leaked, f"Oracle columns leaked into build_edge_features: {leaked}" |
| df = df.copy() |
|
|
| df = df.sort_values("timestamp").reset_index(drop=True) |
|
|
| n = len(df) |
| sender_ids = df["sender_id"].to_numpy(dtype=np.int64) |
| receiver_ids = df["receiver_id"].to_numpy(dtype=np.int64) |
| timestamps = df["timestamp"].to_numpy(dtype=np.float32) |
|
|
| last_sender_time: dict[int, float] = {} |
| sender_degree: dict[int, int] = {} |
| receiver_degree: dict[int, int] = {} |
| pair_count: dict[tuple[int, int], int] = {} |
|
|
| time_delta = np.zeros(n, dtype=np.float32) |
| sender_degree_feat = np.zeros(n, dtype=np.float32) |
| receiver_degree_feat = np.zeros(n, dtype=np.float32) |
| pair_freq_feat = np.zeros(n, dtype=np.float32) |
|
|
| for i, (sender_id, receiver_id, timestamp) in enumerate( |
| zip(sender_ids, receiver_ids, timestamps) |
| ): |
| prev_t = last_sender_time.get(int(sender_id)) |
| dt = 0.0 if prev_t is None else max(0.0, float(timestamp) - prev_t) |
| time_delta[i] = np.log1p(dt) * 0.5 |
| last_sender_time[int(sender_id)] = float(timestamp) |
|
|
| sender_degree_feat[i] = np.log1p(sender_degree.get(int(sender_id), 0)) |
| receiver_degree_feat[i] = np.log1p(receiver_degree.get(int(receiver_id), 0)) |
| pair_freq_feat[i] = np.log1p(pair_count.get((int(sender_id), int(receiver_id)), 0)) |
|
|
| sender_degree[int(sender_id)] = sender_degree.get(int(sender_id), 0) + 1 |
| receiver_degree[int(receiver_id)] = receiver_degree.get(int(receiver_id), 0) + 1 |
| pair_count[(int(sender_id), int(receiver_id))] = ( |
| pair_count.get((int(sender_id), int(receiver_id)), 0) + 1 |
| ) |
|
|
| neighbor_score = ( |
| df["neighbor_score"].to_numpy(dtype=np.float32) |
| if "neighbor_score" in df.columns |
| else np.zeros(n, dtype=np.float32) |
| ) |
| fail_prob = ( |
| df["fail_prob"].to_numpy(dtype=np.float32) |
| if "fail_prob" in df.columns |
| else np.zeros(n, dtype=np.float32) |
| ) |
| failed = ( |
| df["failed"].to_numpy(dtype=np.float32) |
| if "failed" in df.columns |
| else np.zeros(n, dtype=np.float32) |
| ) |
| is_retry = ( |
| df["is_retry"].to_numpy(dtype=np.float32) |
| if "is_retry" in df.columns |
| else np.zeros(n, dtype=np.float32) |
| ) |
|
|
| edge_attr = np.stack([ |
| df["amount"].to_numpy(dtype=np.float32), |
| time_delta, |
| fail_prob, |
| failed, |
| is_retry, |
| neighbor_score, |
| sender_degree_feat, |
| receiver_degree_feat, |
| pair_freq_feat, |
| ], axis=1) |
|
|
| return edge_attr.astype(np.float32) |
|
|
|
|
| def build_labels(df: pd.DataFrame): |
| return df["is_fraud"].values.astype(np.int64) |
|
|