# dominant_data.py — Grafo bancário para anomaly detection sem labels import numpy as np import torch from torch_geometric.data import Data import pandas as pd def gerar_grafo_anomaly( n_nos=500, n_arestas=2500, n_features=16, taxa_anomalia=0.05, seed=42 ): """ Gera grafo atribuído bancário com anomalias embutidas. Tipos de anomalia (sem labels no treino): - Structural: nó com padrão de conexão anômalo (hub suspeito, isolado) - Attribute: nó com features fora da distribuição normal - Combined: ambos ao mesmo tempo (fraude real) O DOMINANT detecta todos sem ver nenhum label. """ np.random.seed(seed) torch.manual_seed(seed) n_anomalias = int(n_nos * taxa_anomalia) anomaly_idx = np.random.choice(n_nos, n_anomalias, replace=False) anomaly_set = set(anomaly_idx.tolist()) # ── FEATURES DOS NÓS ───────────────────────────────────── # Contas bancárias: saldo, limite, age, tx_count, avg_valor... feats = np.zeros((n_nos, n_features)) feat_names = [ 'saldo_norm', 'limite_norm', 'idade_norm', 'tx_mes_norm', 'avg_valor_norm', 'max_valor_norm', 'dias_ativo_norm', 'n_dest_distintos_norm', 'tx_noturnas_ratio', 'tx_internac_ratio', 'score_credito', 'renda_norm', 'tempo_cliente_norm', 'chargeback_ratio', 'dispositivos_distintos', 'pais_origem', ] for i in range(n_nos): if i in anomaly_set: # Anomalia de atributo — valores fora da distribuição feats[i] = np.random.normal(0, 1, n_features) feats[i, 0] = np.random.uniform(0.9, 1.0) # saldo extremo feats[i, 3] = np.random.uniform(0.8, 1.0) # muitas tx feats[i, 8] = np.random.uniform(0.7, 1.0) # muitas noturnas feats[i, 13] = np.random.uniform(0.5, 1.0) # chargeback alto else: # Nó normal — distribuição realista feats[i, 0] = np.random.beta(2, 5) # saldo feats[i, 1] = np.random.beta(3, 4) # limite feats[i, 2] = np.clip(np.random.normal(0.45, 0.15), 0, 1) # idade feats[i, 3] = np.random.beta(2, 6) # tx_mes feats[i, 4] = np.random.beta(3, 5) # avg_valor feats[i, 5] = np.random.beta(2, 8) # max_valor feats[i, 6] = np.random.beta(5, 2) # dias_ativo feats[i, 7] = np.random.beta(2, 7) # dest_distintos feats[i, 8] = np.random.beta(1, 9) # noturnas feats[i, 9] = np.random.beta(1, 15) # internac feats[i, 10] = np.random.beta(6, 2) # score feats[i, 11] = np.random.beta(3, 4) # renda feats[i, 12] = np.random.beta(5, 3) # tempo_cliente feats[i, 13] = np.random.beta(1, 20) # chargeback feats[i, 14] = np.random.beta(2, 8) # dispositivos feats[i, 15] = np.random.binomial(1, 0.1) # pais_origem # ── ARESTAS ─────────────────────────────────────────────── # Grafo de transações: aresta = transferência entre contas src_list, dst_list = [], [] # Nós normais: conexões aleatórias com preferential attachment leve grau = np.ones(n_nos) for _ in range(n_arestas): # Preferential attachment prob = grau / grau.sum() s = np.random.choice(n_nos, p=prob) d = np.random.choice(n_nos) if s != d: src_list.append(s) dst_list.append(d) grau[s] += 1 grau[d] += 1 # Anomalias estruturais: hub (muitas conexões) ou isolado for idx in anomaly_idx[:n_anomalias//2]: # Hub suspeito — conecta com muitos nós diferentes n_extra = np.random.randint(10, 25) targets = np.random.choice( [i for i in range(n_nos) if i != idx], n_extra, replace=False) for t in targets: src_list.append(idx) dst_list.append(t) edge_index = torch.LongTensor([src_list, dst_list]) x = torch.FloatTensor(feats) # Labels APENAS para avaliação (não usados no treino) labels = torch.zeros(n_nos, dtype=torch.long) labels[anomaly_idx] = 1 data = Data(x=x, edge_index=edge_index, y=labels) data.feat_names = feat_names data.anomaly_idx = anomaly_idx return data, feat_names def get_adj_normalizada(edge_index, n_nos): """Adjacência normalizada para GCN: D^-1/2 A D^-1/2""" # Adiciona self-loops self_loops = torch.arange(n_nos).unsqueeze(0).repeat(2, 1) ei = torch.cat([edge_index, self_loops], dim=1) # Grau deg = torch.zeros(n_nos) deg.scatter_add_(0, ei[0], torch.ones(ei.shape[1])) deg_inv_sqrt = deg.pow(-0.5) deg_inv_sqrt[deg_inv_sqrt == float('inf')] = 0 # Pesos normalizados row, col = ei norm = deg_inv_sqrt[row] * deg_inv_sqrt[col] return ei, norm, n_nos