File size: 3,675 Bytes
a3682cf | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 | from __future__ import annotations
import numpy as np
import pandas as pd
# Columns that graph builder must never receive (oracle/audit only)
_ALLOWED_EDGE_COLS = frozenset({
"sender_id", "receiver_id", "timestamp", "amount",
"fail_prob", "failed", "is_retry", "neighbor_score",
"risk_score", "txn_type", "pair_freq", "risk_noisy",
"txn_count_10", "amount_sum_10", "is_fraud",
})
_BLOCKED_COLS = frozenset({
"motif_hit_count", "motif_source", "trigger_event_idx", "label_event_idx",
"label_delay", "is_fallback_label", "fraud_source",
"twin_role", "twin_label", "twin_pair_id", "template_id",
"dynamic_fraud_state", "motif_chain_state", "motif_strength",
})
def build_edge_index(df: pd.DataFrame):
src = df["sender_id"].values.astype(np.int64)
dst = df["receiver_id"].values.astype(np.int64)
edge_index = np.vstack([src, dst])
return edge_index
def build_edge_features(df: pd.DataFrame):
leaked = _BLOCKED_COLS & set(df.columns)
assert not leaked, f"Oracle columns leaked into build_edge_features: {leaked}"
df = df.copy()
df = df.sort_values("timestamp").reset_index(drop=True)
n = len(df)
sender_ids = df["sender_id"].to_numpy(dtype=np.int64)
receiver_ids = df["receiver_id"].to_numpy(dtype=np.int64)
timestamps = df["timestamp"].to_numpy(dtype=np.float32)
last_sender_time: dict[int, float] = {}
sender_degree: dict[int, int] = {}
receiver_degree: dict[int, int] = {}
pair_count: dict[tuple[int, int], int] = {}
time_delta = np.zeros(n, dtype=np.float32)
sender_degree_feat = np.zeros(n, dtype=np.float32)
receiver_degree_feat = np.zeros(n, dtype=np.float32)
pair_freq_feat = np.zeros(n, dtype=np.float32)
for i, (sender_id, receiver_id, timestamp) in enumerate(
zip(sender_ids, receiver_ids, timestamps)
):
prev_t = last_sender_time.get(int(sender_id))
dt = 0.0 if prev_t is None else max(0.0, float(timestamp) - prev_t)
time_delta[i] = np.log1p(dt) * 0.5
last_sender_time[int(sender_id)] = float(timestamp)
sender_degree_feat[i] = np.log1p(sender_degree.get(int(sender_id), 0))
receiver_degree_feat[i] = np.log1p(receiver_degree.get(int(receiver_id), 0))
pair_freq_feat[i] = np.log1p(pair_count.get((int(sender_id), int(receiver_id)), 0))
sender_degree[int(sender_id)] = sender_degree.get(int(sender_id), 0) + 1
receiver_degree[int(receiver_id)] = receiver_degree.get(int(receiver_id), 0) + 1
pair_count[(int(sender_id), int(receiver_id))] = (
pair_count.get((int(sender_id), int(receiver_id)), 0) + 1
)
neighbor_score = (
df["neighbor_score"].to_numpy(dtype=np.float32)
if "neighbor_score" in df.columns
else np.zeros(n, dtype=np.float32)
)
fail_prob = (
df["fail_prob"].to_numpy(dtype=np.float32)
if "fail_prob" in df.columns
else np.zeros(n, dtype=np.float32)
)
failed = (
df["failed"].to_numpy(dtype=np.float32)
if "failed" in df.columns
else np.zeros(n, dtype=np.float32)
)
is_retry = (
df["is_retry"].to_numpy(dtype=np.float32)
if "is_retry" in df.columns
else np.zeros(n, dtype=np.float32)
)
edge_attr = np.stack([
df["amount"].to_numpy(dtype=np.float32),
time_delta,
fail_prob,
failed,
is_retry,
neighbor_score,
sender_degree_feat,
receiver_degree_feat,
pair_freq_feat,
], axis=1)
return edge_attr.astype(np.float32)
def build_labels(df: pd.DataFrame):
return df["is_fraud"].values.astype(np.int64)
|