Spaces:
Sleeping
Sleeping
File size: 4,321 Bytes
db886e4 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 | from typing import Dict, Any, List, Tuple, Set
from collections import deque, defaultdict
from explorers import fetch_with_fallback
from config import AppConfig
def expand_ego(txid: str, k: int, source: str, cfg: AppConfig):
"""
Expand ego-subgraph up to k steps backward (parents) and forward (children).
Returns:
nodes: List[str] txids
edges: List[Tuple[int,int]] parent->child indices
center_idx: int
node_meta: Dict[txid, dict]
logs: List[str]
"""
logs = []
client, tx0, out0, errs = fetch_with_fallback(txid, cfg, source)
if client is None:
return [], [], -1, {}, ["All providers failed", *(errs or [])]
nodes: List[str] = []
idx_map: Dict[str, int] = {}
edges: List[Tuple[int,int]] = []
node_meta: Dict[str, Dict[str, Any]] = {}
def add_node(tid: str, meta: Dict[str, Any]):
if tid in idx_map:
return idx_map[tid]
if len(nodes) >= cfg.MAX_NODES:
return None
idx = len(nodes)
nodes.append(tid)
idx_map[tid] = idx
node_meta[tid] = meta
return idx
def ensure_tx(tid: str):
c, tj, outsp, _ = fetch_with_fallback(tid, cfg, source)
if tj is None:
return None, None
return tj, outsp
# seed
center_idx = add_node(txid, tx0)
if center_idx is None:
return [], [], -1, {}, ["MAX_NODES reached at seed"]
frontier_par = deque([(txid, 0)])
frontier_ch = deque([(txid, 0)])
# BFS backward (parents)
while frontier_par:
cur, depth = frontier_par.popleft()
if depth >= k:
continue
tj = node_meta.get(cur)
if tj is None:
t, o = ensure_tx(cur)
if t is None:
continue
node_meta[cur] = t
tj = t
for vi in tj.get("vin", []):
ptx = vi.get("txid")
if not ptx:
continue
if ptx not in idx_map:
if len(nodes) >= cfg.MAX_NODES:
logs.append("MAX_NODES reached during backward expansion")
break
ptj, pout = ensure_tx(ptx)
if ptj is None:
continue
pidx = add_node(ptx, ptj)
if pidx is None:
continue
frontier_par.append((ptx, depth+1))
else:
pidx = idx_map[ptx]
# edge parent->child
cidx = idx_map.get(cur)
if cidx is not None:
edges.append((pidx, cidx))
if cfg.MAX_EDGES and len(edges) >= cfg.MAX_EDGES:
logs.append("MAX_EDGES reached; stopping further edge additions")
break
# BFS forward (children)
while frontier_ch:
cur, depth = frontier_ch.popleft()
if depth >= k:
continue
tj = node_meta.get(cur)
if tj is None:
t, o = ensure_tx(cur)
if t is None:
continue
node_meta[cur] = t
tj = t
outsp = None
try:
_, _, outsp, _ = fetch_with_fallback(cur, cfg, source)
except Exception:
outsp = None
if outsp is None:
continue
for child_tx in outsp:
if not child_tx:
continue
if child_tx not in idx_map:
if len(nodes) >= cfg.MAX_NODES:
logs.append("MAX_NODES reached during forward expansion")
break
ctj, cout = ensure_tx(child_tx)
if ctj is None:
continue
cidx = add_node(child_tx, ctj)
if cidx is None:
continue
frontier_ch.append((child_tx, depth+1))
else:
cidx = idx_map[child_tx]
pidx = idx_map.get(cur)
if pidx is not None:
edges.append((pidx, cidx))
if cfg.MAX_EDGES and len(edges) >= cfg.MAX_EDGES:
logs.append("MAX_EDGES reached; stopping further edge additions")
break
# deduplicate edges
edges = list(set(edges))
return nodes, edges, center_idx, node_meta, logs
|