MLGraph-Bitcoin-GAD / graph_builder.py
thanhphxu's picture
Upload folder using huggingface_hub
db886e4 verified
from typing import Dict, Any, List, Tuple, Set
from collections import deque, defaultdict
from explorers import fetch_with_fallback
from config import AppConfig
def expand_ego(txid: str, k: int, source: str, cfg: AppConfig):
"""
Expand ego-subgraph up to k steps backward (parents) and forward (children).
Returns:
nodes: List[str] txids
edges: List[Tuple[int,int]] parent->child indices
center_idx: int
node_meta: Dict[txid, dict]
logs: List[str]
"""
logs = []
client, tx0, out0, errs = fetch_with_fallback(txid, cfg, source)
if client is None:
return [], [], -1, {}, ["All providers failed", *(errs or [])]
nodes: List[str] = []
idx_map: Dict[str, int] = {}
edges: List[Tuple[int,int]] = []
node_meta: Dict[str, Dict[str, Any]] = {}
def add_node(tid: str, meta: Dict[str, Any]):
if tid in idx_map:
return idx_map[tid]
if len(nodes) >= cfg.MAX_NODES:
return None
idx = len(nodes)
nodes.append(tid)
idx_map[tid] = idx
node_meta[tid] = meta
return idx
def ensure_tx(tid: str):
c, tj, outsp, _ = fetch_with_fallback(tid, cfg, source)
if tj is None:
return None, None
return tj, outsp
# seed
center_idx = add_node(txid, tx0)
if center_idx is None:
return [], [], -1, {}, ["MAX_NODES reached at seed"]
frontier_par = deque([(txid, 0)])
frontier_ch = deque([(txid, 0)])
# BFS backward (parents)
while frontier_par:
cur, depth = frontier_par.popleft()
if depth >= k:
continue
tj = node_meta.get(cur)
if tj is None:
t, o = ensure_tx(cur)
if t is None:
continue
node_meta[cur] = t
tj = t
for vi in tj.get("vin", []):
ptx = vi.get("txid")
if not ptx:
continue
if ptx not in idx_map:
if len(nodes) >= cfg.MAX_NODES:
logs.append("MAX_NODES reached during backward expansion")
break
ptj, pout = ensure_tx(ptx)
if ptj is None:
continue
pidx = add_node(ptx, ptj)
if pidx is None:
continue
frontier_par.append((ptx, depth+1))
else:
pidx = idx_map[ptx]
# edge parent->child
cidx = idx_map.get(cur)
if cidx is not None:
edges.append((pidx, cidx))
if cfg.MAX_EDGES and len(edges) >= cfg.MAX_EDGES:
logs.append("MAX_EDGES reached; stopping further edge additions")
break
# BFS forward (children)
while frontier_ch:
cur, depth = frontier_ch.popleft()
if depth >= k:
continue
tj = node_meta.get(cur)
if tj is None:
t, o = ensure_tx(cur)
if t is None:
continue
node_meta[cur] = t
tj = t
outsp = None
try:
_, _, outsp, _ = fetch_with_fallback(cur, cfg, source)
except Exception:
outsp = None
if outsp is None:
continue
for child_tx in outsp:
if not child_tx:
continue
if child_tx not in idx_map:
if len(nodes) >= cfg.MAX_NODES:
logs.append("MAX_NODES reached during forward expansion")
break
ctj, cout = ensure_tx(child_tx)
if ctj is None:
continue
cidx = add_node(child_tx, ctj)
if cidx is None:
continue
frontier_ch.append((child_tx, depth+1))
else:
cidx = idx_map[child_tx]
pidx = idx_map.get(cur)
if pidx is not None:
edges.append((pidx, cidx))
if cfg.MAX_EDGES and len(edges) >= cfg.MAX_EDGES:
logs.append("MAX_EDGES reached; stopping further edge additions")
break
# deduplicate edges
edges = list(set(edges))
return nodes, edges, center_idx, node_meta, logs