from __future__ import annotations import numpy as np import pandas as pd # Columns that graph builder must never receive (oracle/audit only) _ALLOWED_EDGE_COLS = frozenset({ "sender_id", "receiver_id", "timestamp", "amount", "fail_prob", "failed", "is_retry", "neighbor_score", "risk_score", "txn_type", "pair_freq", "risk_noisy", "txn_count_10", "amount_sum_10", "is_fraud", }) _BLOCKED_COLS = frozenset({ "motif_hit_count", "motif_source", "trigger_event_idx", "label_event_idx", "label_delay", "is_fallback_label", "fraud_source", "twin_role", "twin_label", "twin_pair_id", "template_id", "dynamic_fraud_state", "motif_chain_state", "motif_strength", }) def build_edge_index(df: pd.DataFrame): src = df["sender_id"].values.astype(np.int64) dst = df["receiver_id"].values.astype(np.int64) edge_index = np.vstack([src, dst]) return edge_index def build_edge_features(df: pd.DataFrame): leaked = _BLOCKED_COLS & set(df.columns) assert not leaked, f"Oracle columns leaked into build_edge_features: {leaked}" df = df.copy() df = df.sort_values("timestamp").reset_index(drop=True) n = len(df) sender_ids = df["sender_id"].to_numpy(dtype=np.int64) receiver_ids = df["receiver_id"].to_numpy(dtype=np.int64) timestamps = df["timestamp"].to_numpy(dtype=np.float32) last_sender_time: dict[int, float] = {} sender_degree: dict[int, int] = {} receiver_degree: dict[int, int] = {} pair_count: dict[tuple[int, int], int] = {} time_delta = np.zeros(n, dtype=np.float32) sender_degree_feat = np.zeros(n, dtype=np.float32) receiver_degree_feat = np.zeros(n, dtype=np.float32) pair_freq_feat = np.zeros(n, dtype=np.float32) for i, (sender_id, receiver_id, timestamp) in enumerate( zip(sender_ids, receiver_ids, timestamps) ): prev_t = last_sender_time.get(int(sender_id)) dt = 0.0 if prev_t is None else max(0.0, float(timestamp) - prev_t) time_delta[i] = np.log1p(dt) * 0.5 last_sender_time[int(sender_id)] = float(timestamp) sender_degree_feat[i] = np.log1p(sender_degree.get(int(sender_id), 0)) receiver_degree_feat[i] = np.log1p(receiver_degree.get(int(receiver_id), 0)) pair_freq_feat[i] = np.log1p(pair_count.get((int(sender_id), int(receiver_id)), 0)) sender_degree[int(sender_id)] = sender_degree.get(int(sender_id), 0) + 1 receiver_degree[int(receiver_id)] = receiver_degree.get(int(receiver_id), 0) + 1 pair_count[(int(sender_id), int(receiver_id))] = ( pair_count.get((int(sender_id), int(receiver_id)), 0) + 1 ) neighbor_score = ( df["neighbor_score"].to_numpy(dtype=np.float32) if "neighbor_score" in df.columns else np.zeros(n, dtype=np.float32) ) fail_prob = ( df["fail_prob"].to_numpy(dtype=np.float32) if "fail_prob" in df.columns else np.zeros(n, dtype=np.float32) ) failed = ( df["failed"].to_numpy(dtype=np.float32) if "failed" in df.columns else np.zeros(n, dtype=np.float32) ) is_retry = ( df["is_retry"].to_numpy(dtype=np.float32) if "is_retry" in df.columns else np.zeros(n, dtype=np.float32) ) edge_attr = np.stack([ df["amount"].to_numpy(dtype=np.float32), time_delta, fail_prob, failed, is_retry, neighbor_score, sender_degree_feat, receiver_degree_feat, pair_freq_feat, ], axis=1) return edge_attr.astype(np.float32) def build_labels(df: pd.DataFrame): return df["is_fraud"].values.astype(np.int64)