Spaces:
Sleeping
Sleeping
| from typing import Dict, Any, List, Tuple | |
| import numpy as np | |
| from collections import deque, defaultdict | |
| from sklearn.preprocessing import StandardScaler | |
| def _sum_inputs_btc(tx: Dict[str, Any]) -> float: | |
| s = 0.0 | |
| for v in tx.get("vin", []): | |
| s += float(v.get("prevout_value") or 0.0) / 1e8 | |
| return s | |
| def _sum_outputs_btc(tx: Dict[str, Any]) -> float: | |
| s = 0.0 | |
| for o in tx.get("vout", []): | |
| s += float(o.get("value") or 0.0) / 1e8 | |
| return s | |
| def _compute_distances(n: int, edges: List[Tuple[int,int]], center: int) -> np.ndarray: | |
| # undirected BFS distance | |
| adj = [[] for _ in range(n)] | |
| for u,v in edges: | |
| adj[u].append(v); adj[v].append(u) | |
| dist = np.full(n, fill_value=-1, dtype=np.int32) | |
| q = deque([center]); dist[center] = 0 | |
| while q: | |
| u = q.popleft() | |
| for nb in adj[u]: | |
| if dist[nb] == -1: | |
| dist[nb] = dist[u] + 1 | |
| q.append(nb) | |
| return dist | |
| def build_features(nodes: List[str], edges: List[Tuple[int,int]], center_idx: int, node_meta: Dict[str, Dict[str, Any]]): | |
| n = len(nodes) | |
| # degrees | |
| out_deg = np.zeros(n, dtype=np.float32) | |
| in_deg = np.zeros(n, dtype=np.float32) | |
| for u,v in edges: | |
| out_deg[u] += 1 | |
| in_deg[v] += 1 | |
| deg = in_deg + out_deg | |
| ratio_in_out = in_deg / (out_deg + 1e-6) | |
| # sums & counts from metadata | |
| sum_in_btc = np.zeros(n, dtype=np.float32) | |
| sum_out_btc = np.zeros(n, dtype=np.float32) | |
| n_inputs = np.zeros(n, dtype=np.float32) | |
| n_outputs = np.zeros(n, dtype=np.float32) | |
| block_height = np.zeros(n, dtype=np.float32) | |
| for idx, txid in enumerate(nodes): | |
| meta = node_meta.get(txid) or {} | |
| n_inputs[idx] = float(len(meta.get("vin", []) or [])) | |
| n_outputs[idx] = float(len(meta.get("vout", []) or [])) | |
| sum_in_btc[idx] = _sum_inputs_btc(meta) | |
| sum_out_btc[idx] = _sum_outputs_btc(meta) | |
| bh = meta.get("block_height") | |
| block_height[idx] = float(bh) if bh is not None else 0.0 | |
| log_sum_in = np.log1p(sum_in_btc) | |
| log_sum_out = np.log1p(sum_out_btc) | |
| distance = _compute_distances(n, edges, center_idx) | |
| feats = np.stack([ | |
| in_deg, out_deg, deg, ratio_in_out, | |
| n_inputs, n_outputs, | |
| sum_in_btc, sum_out_btc, | |
| log_sum_in, log_sum_out, | |
| distance.astype(np.float32), | |
| block_height | |
| ], axis=1) | |
| feature_names = [ | |
| "in_degree","out_degree","degree","ratio_in_out", | |
| "n_inputs","n_outputs", | |
| "sum_in_btc","sum_out_btc", | |
| "log_sum_in","log_sum_out", | |
| "distance","block_height", | |
| ] | |
| return feats, feature_names | |
| def scale_features(X: np.ndarray, scaler=None): | |
| if scaler is None: | |
| scaler = StandardScaler() | |
| Xs = scaler.fit_transform(X) | |
| note = "Fitted new StandardScaler on ego-subgraph (domain shift vs Elliptic)." | |
| else: | |
| Xs = scaler.transform(X) | |
| note = "Used provided scaler from model repo." | |
| return Xs.astype("float32"), scaler, note | |