Spaces:
Sleeping
Sleeping
| # dominant_data.py — Grafo bancário para anomaly detection sem labels | |
| import numpy as np | |
| import torch | |
| from torch_geometric.data import Data | |
| import pandas as pd | |
| def gerar_grafo_anomaly( | |
| n_nos=500, n_arestas=2500, | |
| n_features=16, taxa_anomalia=0.05, | |
| seed=42 | |
| ): | |
| """ | |
| Gera grafo atribuído bancário com anomalias embutidas. | |
| Tipos de anomalia (sem labels no treino): | |
| - Structural: nó com padrão de conexão anômalo (hub suspeito, isolado) | |
| - Attribute: nó com features fora da distribuição normal | |
| - Combined: ambos ao mesmo tempo (fraude real) | |
| O DOMINANT detecta todos sem ver nenhum label. | |
| """ | |
| np.random.seed(seed) | |
| torch.manual_seed(seed) | |
| n_anomalias = int(n_nos * taxa_anomalia) | |
| anomaly_idx = np.random.choice(n_nos, n_anomalias, replace=False) | |
| anomaly_set = set(anomaly_idx.tolist()) | |
| # ── FEATURES DOS NÓS ───────────────────────────────────── | |
| # Contas bancárias: saldo, limite, age, tx_count, avg_valor... | |
| feats = np.zeros((n_nos, n_features)) | |
| feat_names = [ | |
| 'saldo_norm', 'limite_norm', 'idade_norm', 'tx_mes_norm', | |
| 'avg_valor_norm', 'max_valor_norm', 'dias_ativo_norm', | |
| 'n_dest_distintos_norm', 'tx_noturnas_ratio', 'tx_internac_ratio', | |
| 'score_credito', 'renda_norm', 'tempo_cliente_norm', | |
| 'chargeback_ratio', 'dispositivos_distintos', 'pais_origem', | |
| ] | |
| for i in range(n_nos): | |
| if i in anomaly_set: | |
| # Anomalia de atributo — valores fora da distribuição | |
| feats[i] = np.random.normal(0, 1, n_features) | |
| feats[i, 0] = np.random.uniform(0.9, 1.0) # saldo extremo | |
| feats[i, 3] = np.random.uniform(0.8, 1.0) # muitas tx | |
| feats[i, 8] = np.random.uniform(0.7, 1.0) # muitas noturnas | |
| feats[i, 13] = np.random.uniform(0.5, 1.0) # chargeback alto | |
| else: | |
| # Nó normal — distribuição realista | |
| feats[i, 0] = np.random.beta(2, 5) # saldo | |
| feats[i, 1] = np.random.beta(3, 4) # limite | |
| feats[i, 2] = np.clip(np.random.normal(0.45, 0.15), 0, 1) # idade | |
| feats[i, 3] = np.random.beta(2, 6) # tx_mes | |
| feats[i, 4] = np.random.beta(3, 5) # avg_valor | |
| feats[i, 5] = np.random.beta(2, 8) # max_valor | |
| feats[i, 6] = np.random.beta(5, 2) # dias_ativo | |
| feats[i, 7] = np.random.beta(2, 7) # dest_distintos | |
| feats[i, 8] = np.random.beta(1, 9) # noturnas | |
| feats[i, 9] = np.random.beta(1, 15) # internac | |
| feats[i, 10] = np.random.beta(6, 2) # score | |
| feats[i, 11] = np.random.beta(3, 4) # renda | |
| feats[i, 12] = np.random.beta(5, 3) # tempo_cliente | |
| feats[i, 13] = np.random.beta(1, 20) # chargeback | |
| feats[i, 14] = np.random.beta(2, 8) # dispositivos | |
| feats[i, 15] = np.random.binomial(1, 0.1) # pais_origem | |
| # ── ARESTAS ─────────────────────────────────────────────── | |
| # Grafo de transações: aresta = transferência entre contas | |
| src_list, dst_list = [], [] | |
| # Nós normais: conexões aleatórias com preferential attachment leve | |
| grau = np.ones(n_nos) | |
| for _ in range(n_arestas): | |
| # Preferential attachment | |
| prob = grau / grau.sum() | |
| s = np.random.choice(n_nos, p=prob) | |
| d = np.random.choice(n_nos) | |
| if s != d: | |
| src_list.append(s) | |
| dst_list.append(d) | |
| grau[s] += 1 | |
| grau[d] += 1 | |
| # Anomalias estruturais: hub (muitas conexões) ou isolado | |
| for idx in anomaly_idx[:n_anomalias//2]: | |
| # Hub suspeito — conecta com muitos nós diferentes | |
| n_extra = np.random.randint(10, 25) | |
| targets = np.random.choice( | |
| [i for i in range(n_nos) if i != idx], n_extra, replace=False) | |
| for t in targets: | |
| src_list.append(idx) | |
| dst_list.append(t) | |
| edge_index = torch.LongTensor([src_list, dst_list]) | |
| x = torch.FloatTensor(feats) | |
| # Labels APENAS para avaliação (não usados no treino) | |
| labels = torch.zeros(n_nos, dtype=torch.long) | |
| labels[anomaly_idx] = 1 | |
| data = Data(x=x, edge_index=edge_index, y=labels) | |
| data.feat_names = feat_names | |
| data.anomaly_idx = anomaly_idx | |
| return data, feat_names | |
| def get_adj_normalizada(edge_index, n_nos): | |
| """Adjacência normalizada para GCN: D^-1/2 A D^-1/2""" | |
| # Adiciona self-loops | |
| self_loops = torch.arange(n_nos).unsqueeze(0).repeat(2, 1) | |
| ei = torch.cat([edge_index, self_loops], dim=1) | |
| # Grau | |
| deg = torch.zeros(n_nos) | |
| deg.scatter_add_(0, ei[0], torch.ones(ei.shape[1])) | |
| deg_inv_sqrt = deg.pow(-0.5) | |
| deg_inv_sqrt[deg_inv_sqrt == float('inf')] = 0 | |
| # Pesos normalizados | |
| row, col = ei | |
| norm = deg_inv_sqrt[row] * deg_inv_sqrt[col] | |
| return ei, norm, n_nos |