DOMINANT / dominant.data.py
Danielfonseca1212's picture
Create dominant.data.py
b903062 verified
# dominant_data.py — Grafo bancário para anomaly detection sem labels
import numpy as np
import torch
from torch_geometric.data import Data
import pandas as pd
def gerar_grafo_anomaly(
n_nos=500, n_arestas=2500,
n_features=16, taxa_anomalia=0.05,
seed=42
):
"""
Gera grafo atribuído bancário com anomalias embutidas.
Tipos de anomalia (sem labels no treino):
- Structural: nó com padrão de conexão anômalo (hub suspeito, isolado)
- Attribute: nó com features fora da distribuição normal
- Combined: ambos ao mesmo tempo (fraude real)
O DOMINANT detecta todos sem ver nenhum label.
"""
np.random.seed(seed)
torch.manual_seed(seed)
n_anomalias = int(n_nos * taxa_anomalia)
anomaly_idx = np.random.choice(n_nos, n_anomalias, replace=False)
anomaly_set = set(anomaly_idx.tolist())
# ── FEATURES DOS NÓS ─────────────────────────────────────
# Contas bancárias: saldo, limite, age, tx_count, avg_valor...
feats = np.zeros((n_nos, n_features))
feat_names = [
'saldo_norm', 'limite_norm', 'idade_norm', 'tx_mes_norm',
'avg_valor_norm', 'max_valor_norm', 'dias_ativo_norm',
'n_dest_distintos_norm', 'tx_noturnas_ratio', 'tx_internac_ratio',
'score_credito', 'renda_norm', 'tempo_cliente_norm',
'chargeback_ratio', 'dispositivos_distintos', 'pais_origem',
]
for i in range(n_nos):
if i in anomaly_set:
# Anomalia de atributo — valores fora da distribuição
feats[i] = np.random.normal(0, 1, n_features)
feats[i, 0] = np.random.uniform(0.9, 1.0) # saldo extremo
feats[i, 3] = np.random.uniform(0.8, 1.0) # muitas tx
feats[i, 8] = np.random.uniform(0.7, 1.0) # muitas noturnas
feats[i, 13] = np.random.uniform(0.5, 1.0) # chargeback alto
else:
# Nó normal — distribuição realista
feats[i, 0] = np.random.beta(2, 5) # saldo
feats[i, 1] = np.random.beta(3, 4) # limite
feats[i, 2] = np.clip(np.random.normal(0.45, 0.15), 0, 1) # idade
feats[i, 3] = np.random.beta(2, 6) # tx_mes
feats[i, 4] = np.random.beta(3, 5) # avg_valor
feats[i, 5] = np.random.beta(2, 8) # max_valor
feats[i, 6] = np.random.beta(5, 2) # dias_ativo
feats[i, 7] = np.random.beta(2, 7) # dest_distintos
feats[i, 8] = np.random.beta(1, 9) # noturnas
feats[i, 9] = np.random.beta(1, 15) # internac
feats[i, 10] = np.random.beta(6, 2) # score
feats[i, 11] = np.random.beta(3, 4) # renda
feats[i, 12] = np.random.beta(5, 3) # tempo_cliente
feats[i, 13] = np.random.beta(1, 20) # chargeback
feats[i, 14] = np.random.beta(2, 8) # dispositivos
feats[i, 15] = np.random.binomial(1, 0.1) # pais_origem
# ── ARESTAS ───────────────────────────────────────────────
# Grafo de transações: aresta = transferência entre contas
src_list, dst_list = [], []
# Nós normais: conexões aleatórias com preferential attachment leve
grau = np.ones(n_nos)
for _ in range(n_arestas):
# Preferential attachment
prob = grau / grau.sum()
s = np.random.choice(n_nos, p=prob)
d = np.random.choice(n_nos)
if s != d:
src_list.append(s)
dst_list.append(d)
grau[s] += 1
grau[d] += 1
# Anomalias estruturais: hub (muitas conexões) ou isolado
for idx in anomaly_idx[:n_anomalias//2]:
# Hub suspeito — conecta com muitos nós diferentes
n_extra = np.random.randint(10, 25)
targets = np.random.choice(
[i for i in range(n_nos) if i != idx], n_extra, replace=False)
for t in targets:
src_list.append(idx)
dst_list.append(t)
edge_index = torch.LongTensor([src_list, dst_list])
x = torch.FloatTensor(feats)
# Labels APENAS para avaliação (não usados no treino)
labels = torch.zeros(n_nos, dtype=torch.long)
labels[anomaly_idx] = 1
data = Data(x=x, edge_index=edge_index, y=labels)
data.feat_names = feat_names
data.anomaly_idx = anomaly_idx
return data, feat_names
def get_adj_normalizada(edge_index, n_nos):
"""Adjacência normalizada para GCN: D^-1/2 A D^-1/2"""
# Adiciona self-loops
self_loops = torch.arange(n_nos).unsqueeze(0).repeat(2, 1)
ei = torch.cat([edge_index, self_loops], dim=1)
# Grau
deg = torch.zeros(n_nos)
deg.scatter_add_(0, ei[0], torch.ones(ei.shape[1]))
deg_inv_sqrt = deg.pow(-0.5)
deg_inv_sqrt[deg_inv_sqrt == float('inf')] = 0
# Pesos normalizados
row, col = ei
norm = deg_inv_sqrt[row] * deg_inv_sqrt[col]
return ei, norm, n_nos