from typing import Dict, Any, List, Tuple, Set from collections import deque, defaultdict from explorers import fetch_with_fallback from config import AppConfig def expand_ego(txid: str, k: int, source: str, cfg: AppConfig): """ Expand ego-subgraph up to k steps backward (parents) and forward (children). Returns: nodes: List[str] txids edges: List[Tuple[int,int]] parent->child indices center_idx: int node_meta: Dict[txid, dict] logs: List[str] """ logs = [] client, tx0, out0, errs = fetch_with_fallback(txid, cfg, source) if client is None: return [], [], -1, {}, ["All providers failed", *(errs or [])] nodes: List[str] = [] idx_map: Dict[str, int] = {} edges: List[Tuple[int,int]] = [] node_meta: Dict[str, Dict[str, Any]] = {} def add_node(tid: str, meta: Dict[str, Any]): if tid in idx_map: return idx_map[tid] if len(nodes) >= cfg.MAX_NODES: return None idx = len(nodes) nodes.append(tid) idx_map[tid] = idx node_meta[tid] = meta return idx def ensure_tx(tid: str): c, tj, outsp, _ = fetch_with_fallback(tid, cfg, source) if tj is None: return None, None return tj, outsp # seed center_idx = add_node(txid, tx0) if center_idx is None: return [], [], -1, {}, ["MAX_NODES reached at seed"] frontier_par = deque([(txid, 0)]) frontier_ch = deque([(txid, 0)]) # BFS backward (parents) while frontier_par: cur, depth = frontier_par.popleft() if depth >= k: continue tj = node_meta.get(cur) if tj is None: t, o = ensure_tx(cur) if t is None: continue node_meta[cur] = t tj = t for vi in tj.get("vin", []): ptx = vi.get("txid") if not ptx: continue if ptx not in idx_map: if len(nodes) >= cfg.MAX_NODES: logs.append("MAX_NODES reached during backward expansion") break ptj, pout = ensure_tx(ptx) if ptj is None: continue pidx = add_node(ptx, ptj) if pidx is None: continue frontier_par.append((ptx, depth+1)) else: pidx = idx_map[ptx] # edge parent->child cidx = idx_map.get(cur) if cidx is not None: edges.append((pidx, cidx)) if cfg.MAX_EDGES and len(edges) >= cfg.MAX_EDGES: logs.append("MAX_EDGES reached; stopping further edge additions") break # BFS forward (children) while frontier_ch: cur, depth = frontier_ch.popleft() if depth >= k: continue tj = node_meta.get(cur) if tj is None: t, o = ensure_tx(cur) if t is None: continue node_meta[cur] = t tj = t outsp = None try: _, _, outsp, _ = fetch_with_fallback(cur, cfg, source) except Exception: outsp = None if outsp is None: continue for child_tx in outsp: if not child_tx: continue if child_tx not in idx_map: if len(nodes) >= cfg.MAX_NODES: logs.append("MAX_NODES reached during forward expansion") break ctj, cout = ensure_tx(child_tx) if ctj is None: continue cidx = add_node(child_tx, ctj) if cidx is None: continue frontier_ch.append((child_tx, depth+1)) else: cidx = idx_map[child_tx] pidx = idx_map.get(cur) if pidx is not None: edges.append((pidx, cidx)) if cfg.MAX_EDGES and len(edges) >= cfg.MAX_EDGES: logs.append("MAX_EDGES reached; stopping further edge additions") break # deduplicate edges edges = list(set(edges)) return nodes, edges, center_idx, node_meta, logs