# app.py β DOMINANT Graph Anomaly Detection | Sem Labels
import streamlit as st
import numpy as np
import torch
import os
from datetime import datetime
st.set_page_config(
page_title="DOMINANT β Anomaly Detection",
page_icon="π¬",
layout="wide",
initial_sidebar_state="expanded"
)
st.markdown("""
""", unsafe_allow_html=True)
# ββ SESSION STATE βββββββββββββββββββββββββββββββββββββββββββββ
for k, v in {
'trainer': None, 'treinado': False, 'data': None,
'edge_weight': None, 'metricas': None,
'neo4j': None, 'neo4j_ok': False,
}.items():
if k not in st.session_state:
st.session_state[k] = v
# ββ NEO4J βββββββββββββββββββββββββββββββββββββββββββββββββββββ
def get_neo4j_config():
cfg = {}
try:
s = st.secrets
if 'NEO4J_URI' in s:
cfg = {'uri': s['NEO4J_URI'], 'username': s['NEO4J_USERNAME'],
'password': s['NEO4J_PASSWORD'],
'database': s.get('NEO4J_DATABASE', 'neo4j')}
elif 'neo4j' in s:
n = s['neo4j']
cfg = {'uri': n.get('uri',''), 'username': n.get('username',''),
'password': n.get('password',''), 'database': n.get('database','neo4j')}
except Exception:
pass
if not cfg.get('uri'):
cfg = {'uri': os.getenv('NEO4J_URI',''), 'username': os.getenv('NEO4J_USERNAME',''),
'password': os.getenv('NEO4J_PASSWORD',''), 'database': os.getenv('NEO4J_DATABASE','neo4j')}
return cfg
@st.cache_resource
def conectar_neo4j():
try:
from neo4j import GraphDatabase
cfg = get_neo4j_config()
if not all([cfg['uri'], cfg['username'], cfg['password']]):
return None
driver = GraphDatabase.driver(cfg['uri'], auth=(cfg['username'], cfg['password']))
with driver.session(database=cfg['database']) as s:
s.run('RETURN 1')
return driver, cfg['database']
except Exception:
return None
@st.cache_resource
def carregar_libs():
try:
from dominant_data import gerar_grafo_anomaly, get_adj_normalizada
from dominant_model import TrainerDOMINANT
return gerar_grafo_anomaly, get_adj_normalizada, TrainerDOMINANT
except Exception as e:
return str(e), None, None
# ββ CHARTS ββββββββββββββββββββββββββββββββββββββββββββββββββββ
def loss_auc_svg(historico):
loss = historico['loss']
auc = historico['auc']
ep = len(loss)
if ep == 0: return ''
def pts(vals, H=110):
mn,mx = min(vals),max(vals); r = mx-mn or 1
return ' '.join(f'{i*460/max(ep-1,1):.1f},{H-(v-mn)/r*H:.1f}'
for i,v in enumerate(vals))
return f"""
β Lossβ AUC (sem labels)
"""
def roc_svg(y_true, scores):
from sklearn.metrics import roc_curve, auc as sk_auc
fpr,tpr,_ = roc_curve(y_true, scores)
ra = sk_auc(fpr, tpr)
pts = ' '.join(f'{f*440:.1f},{170-t*170:.1f}' for f,t in zip(fpr,tpr))
return f"""
ROC-AUC {ra:.4f}(sem nenhum label no treino)
"""
def score_dist_svg(scores, y_true, thresh):
"""DistribuiΓ§Γ£o dos scores: normal vs anomalia."""
scores_norm = scores[y_true == 0]
scores_anom = scores[y_true == 1]
def hist_pts(vals, bins=30, H=120, W=440, color='#3fb950'):
if len(vals) == 0: return ''
counts, edges = np.histogram(vals, bins=bins, range=(0,1))
max_c = max(counts.max(), 1)
bars = ''
bw = W / bins
for i, c in enumerate(counts):
x = i * bw
h = c / max_c * H
bars += f''
return bars
t_x = thresh * 440
return f"""
DISTRIBUIΓΓO DO ANOMALY SCORE
β Normalβ Anomalia (real)
"""
def scatter_erros_svg(err_attr, err_struct, y_true, n_show=300):
"""Scatter erro atributo vs erro estrutura."""
idx = np.random.choice(len(err_attr), min(n_show, len(err_attr)), replace=False)
ea = err_attr[idx]; es = err_struct[idx]; yt = y_true[idx]
mn_a,mx_a = ea.min(),ea.max(); mn_s,mx_s = es.min(),es.max()
def sc(v,mn,mx,W): return (v-mn)/(mx-mn+1e-8)*W
circles = ''
for a,s,y in zip(ea,es,yt):
px = sc(a,mn_a,mx_a,400); py = 200-sc(s,mn_s,mx_s,200)
col = '#f85149' if y==1 else '#3fb95066'
r = 5 if y==1 else 3
circles += f''
return f"""
ERRO ATRIBUTO vs ERRO ESTRUTURA
β Normalβ Anomalia real
"""
def tsne_svg(embeddings, y_true):
try:
from sklearn.manifold import TSNE
tsne = TSNE(n_components=2, random_state=42,
perplexity=min(30, len(embeddings)//3))
coords = tsne.fit_transform(embeddings)
cx = coords[:,0]; cy = coords[:,1]
mn_x,mx_x = cx.min(),cx.max(); mn_y,mx_y = cy.min(),cy.max()
def sc(v,mn,mx,W): return (v-mn)/(mx-mn+1e-8)*W
circles = ''
for i,(x,y) in enumerate(zip(cx,cy)):
px = sc(x,mn_x,mx_x,440); py = sc(y,mn_y,mx_y,260)
col = '#f85149' if y_true[i]==1 else '#3fb95055'
r = 6 if y_true[i]==1 else 3
circles += f''
return f"""
EMBEDDINGS t-SNE β separaΓ§Γ£o aprendida SEM LABELS
β Normalβ Anomalia
"""
except Exception as e:
return f'
t-SNE indisponΓvel: {e}
'
def top_anomalias_html(top_list, feat_names, data_x):
html = ''
for item in top_list:
idx = item['idx']
score = item['score']
real = item['label_real']
cls = 'anomaly-high' if score > 0.7 else ('anomaly-med' if score > 0.4 else 'anomaly-low')
badge = 'β REAL' if real else ''
# Top features anΓ΄malas deste nΓ³
feats_no = data_x[idx].numpy()
top_feat_idx = np.argsort(np.abs(feats_no - feats_no.mean()))[::-1][:3]
feat_str = ' Β· '.join(feat_names[fi] for fi in top_feat_idx)
bar_w = int(score * 100)
html += f"""
N{idx:04d}
{score:.4f}{feat_str}
{badge}
"""
return html
# ββ SIDEBAR βββββββββββββββββββββββββββββββββββββββββββββββββββ
def sidebar():
st.sidebar.markdown('## π¬ DOMINANT Config')
n_nos = st.sidebar.slider('NΓ³s no grafo', 100, 1000, 500, 50)
n_arestas= st.sidebar.slider('Arestas', 500, 8000, 2500, 500)
n_feats = st.sidebar.select_slider('Features por nΓ³', [8,16,32], 16)
taxa_an = st.sidebar.slider('Taxa anomalia %', 2, 15, 5)
st.sidebar.markdown('---')
st.sidebar.markdown('### Modelo')
hidden = st.sidebar.select_slider('Hidden dim', [32,64,128], 64)
embed = st.sidebar.select_slider('Embed dim', [16,32,64], 32)
alpha = st.sidebar.slider('Ξ± (struct vs attr)', 0.0, 1.0, 0.5, 0.05)
lr = st.sidebar.select_slider('LR', [0.001,0.003,0.005,0.01], 0.005)
epocas = st.sidebar.slider('Γpocas', 20, 200, 100, 10)
dropout = st.sidebar.slider('Dropout', 0.1, 0.5, 0.3, 0.05)
st.sidebar.markdown('---')
st.sidebar.markdown(f'**Ξ± = {alpha:.2f}**')
st.sidebar.caption(f'Ξ±β1: mais peso na estrutura\nΞ±β0: mais peso nos atributos')
if st.session_state.neo4j_ok:
st.sidebar.success('ποΈ Neo4j Conectado')
else:
st.sidebar.warning('β οΈ Neo4j Offline')
return dict(n_nos=n_nos, n_arestas=n_arestas, n_feats=n_feats,
taxa_an=taxa_an/100, hidden=hidden, embed=embed,
alpha=alpha, lr=lr, epocas=epocas, dropout=dropout)
# ββ MAIN ββββββββββββββββββββββββββββββββββββββββββββββββββββββ
def main():
if st.session_state.neo4j is None:
conn = conectar_neo4j()
st.session_state.neo4j = conn
st.session_state.neo4j_ok = conn is not None
cfg = sidebar()
st.markdown("""
DOMINANT
Deep Anomaly Detection on Attributed Networks Β· Ding et al., IJCAI 2019 Β·
Zero labels no treino
',
unsafe_allow_html=True)
st.markdown(' ', unsafe_allow_html=True)
c1, c2 = st.columns(2)
with c1:
st.components.v1.html(
roc_svg(m['y_true'], m['scores']), height=230)
st.components.v1.html(
score_dist_svg(m['scores'], m['y_true'], m['thresh']), height=200)
with c2:
st.components.v1.html(
scatter_erros_svg(m['err_attr'], m['err_struct'], m['y_true']),
height=260)
st.components.v1.html(
tsne_svg(m['embeddings'], m['y_true']), height=310)
# ββ TAB 4: TOP ANOMALIAS ββββββββββββββββββββββββββββββββββ
with tabs[4]:
if not st.session_state.treinado:
st.warning('β¬ οΈ Treine o modelo primeiro.')
else:
st.markdown('### NΓ³s mais anΓ΄malos detectados')
st.markdown('Rankeados por anomaly score β sem nenhum label no treino.')
n_top = st.slider('Top N', 10, 50, 20)
top = st.session_state.trainer.get_top_anomalias(n_top)
feat_names = st.session_state.get('feat_names', [f'f{i}' for i in range(16)])
data_x = st.session_state.data.x
n_detectados = sum(1 for t in top if t['label_real'] == 1)
n_real_total = int(st.session_state.data.y.sum())
c1,c2,c3 = st.columns(3)
c1.metric('Top anomalias analisadas', n_top)
c2.metric('Anomalias reais detectadas', n_detectados)
c3.metric(f'Precision@{n_top}', f'{n_detectados/n_top:.1%}')
st.markdown(' ', unsafe_allow_html=True)
st.markdown(
f'
'
f'{top_anomalias_html(top, feat_names, data_x)}
',
unsafe_allow_html=True)
# Detalhe de um nΓ³ especΓfico
with st.expander('π Inspecionar nΓ³ especΓfico'):
idx_insp = st.number_input('Γndice do nΓ³', 0,
int(data_x.shape[0])-1, int(top[0]['idx']))
m = st.session_state.metricas
score_no = float(m['scores'][idx_insp])
label_no = int(st.session_state.data.y[idx_insp])
feats_no = data_x[idx_insp].numpy()
st.markdown(f"""
**NΓ³ {idx_insp}** Β· Score: `{score_no:.4f}` Β·
Err Atributo: `{m['err_attr'][idx_insp]:.4f}` Β·
Err Estrutura: `{m['err_struct'][idx_insp]:.4f}` Β·
Label real: `{'β Anomalia' if label_no else 'β Normal'}`
""")
# Features do nΓ³ como barras
bars_html = '
'
for fi, fn in enumerate(feat_names):
v = float(feats_no[fi])
pct = min(abs(v)*100, 100)
cor = '#f85149' if v > 0.7 else ('#d29922' if v > 0.4 else '#3fb950')
bars_html += (
f'
'
f'{fn}'
f'
'
f'
'
f'{v:.3f}'
f'
')
bars_html += '
'
st.markdown(bars_html, unsafe_allow_html=True)
# ββ TAB 5: NEO4J βββββββββββββββββββββββββββββββββββββββββ
with tabs[5]:
st.header('ποΈ Neo4j')
if not st.session_state.neo4j_ok:
st.warning('Neo4j offline.')
with st.expander('Como configurar'):
st.markdown("""
**HF Spaces β Settings β Variables and secrets:**
| Chave | Valor |
|---|---|
| `NEO4J_URI` | `neo4j+s://XXXXXXXX.databases.neo4j.io` |
| `NEO4J_USERNAME` | `neo4j` |
| `NEO4J_PASSWORD` | `sua_senha` |
| `NEO4J_DATABASE` | `neo4j` |
""")
else:
st.success('Conectado!')
if st.session_state.treinado and st.button('πΎ Salvar anomalias no Neo4j'):
driver, db = st.session_state.neo4j
top = st.session_state.trainer.get_top_anomalias(50)
m = st.session_state.metricas
try:
with driver.session(database=db) as s:
s.run("""
MERGE (r:DOMINANTRun {ts: $ts})
SET r.auc=$auc, r.ap=$ap, r.alpha=$alpha,
r.n_nos=$n, r.n_anomalias=$na
""", ts=datetime.now().isoformat(),
auc=float(m['auc']), ap=float(m['ap']),
alpha=cfg['alpha'],
n=int(st.session_state.data.x.shape[0]),
na=int(st.session_state.data.y.sum()))
for item in top[:20]:
s.run("""
MERGE (n:AnomaliaNode {idx: $idx})
SET n.score=$score, n.label=$label
""", idx=item['idx'], score=item['score'],
label=item['label_real'])
st.success(f'β Run + {min(20,len(top))} anomalias salvas!')
except Exception as e:
st.error(str(e))
if __name__ == '__main__':
main()