Spaces:
Sleeping
Sleeping
| from typing import Dict, Any, List, Tuple, Set | |
| from collections import deque, defaultdict | |
| from explorers import fetch_with_fallback | |
| from config import AppConfig | |
| def expand_ego(txid: str, k: int, source: str, cfg: AppConfig): | |
| """ | |
| Expand ego-subgraph up to k steps backward (parents) and forward (children). | |
| Returns: | |
| nodes: List[str] txids | |
| edges: List[Tuple[int,int]] parent->child indices | |
| center_idx: int | |
| node_meta: Dict[txid, dict] | |
| logs: List[str] | |
| """ | |
| logs = [] | |
| client, tx0, out0, errs = fetch_with_fallback(txid, cfg, source) | |
| if client is None: | |
| return [], [], -1, {}, ["All providers failed", *(errs or [])] | |
| nodes: List[str] = [] | |
| idx_map: Dict[str, int] = {} | |
| edges: List[Tuple[int,int]] = [] | |
| node_meta: Dict[str, Dict[str, Any]] = {} | |
| def add_node(tid: str, meta: Dict[str, Any]): | |
| if tid in idx_map: | |
| return idx_map[tid] | |
| if len(nodes) >= cfg.MAX_NODES: | |
| return None | |
| idx = len(nodes) | |
| nodes.append(tid) | |
| idx_map[tid] = idx | |
| node_meta[tid] = meta | |
| return idx | |
| def ensure_tx(tid: str): | |
| c, tj, outsp, _ = fetch_with_fallback(tid, cfg, source) | |
| if tj is None: | |
| return None, None | |
| return tj, outsp | |
| # seed | |
| center_idx = add_node(txid, tx0) | |
| if center_idx is None: | |
| return [], [], -1, {}, ["MAX_NODES reached at seed"] | |
| frontier_par = deque([(txid, 0)]) | |
| frontier_ch = deque([(txid, 0)]) | |
| # BFS backward (parents) | |
| while frontier_par: | |
| cur, depth = frontier_par.popleft() | |
| if depth >= k: | |
| continue | |
| tj = node_meta.get(cur) | |
| if tj is None: | |
| t, o = ensure_tx(cur) | |
| if t is None: | |
| continue | |
| node_meta[cur] = t | |
| tj = t | |
| for vi in tj.get("vin", []): | |
| ptx = vi.get("txid") | |
| if not ptx: | |
| continue | |
| if ptx not in idx_map: | |
| if len(nodes) >= cfg.MAX_NODES: | |
| logs.append("MAX_NODES reached during backward expansion") | |
| break | |
| ptj, pout = ensure_tx(ptx) | |
| if ptj is None: | |
| continue | |
| pidx = add_node(ptx, ptj) | |
| if pidx is None: | |
| continue | |
| frontier_par.append((ptx, depth+1)) | |
| else: | |
| pidx = idx_map[ptx] | |
| # edge parent->child | |
| cidx = idx_map.get(cur) | |
| if cidx is not None: | |
| edges.append((pidx, cidx)) | |
| if cfg.MAX_EDGES and len(edges) >= cfg.MAX_EDGES: | |
| logs.append("MAX_EDGES reached; stopping further edge additions") | |
| break | |
| # BFS forward (children) | |
| while frontier_ch: | |
| cur, depth = frontier_ch.popleft() | |
| if depth >= k: | |
| continue | |
| tj = node_meta.get(cur) | |
| if tj is None: | |
| t, o = ensure_tx(cur) | |
| if t is None: | |
| continue | |
| node_meta[cur] = t | |
| tj = t | |
| outsp = None | |
| try: | |
| _, _, outsp, _ = fetch_with_fallback(cur, cfg, source) | |
| except Exception: | |
| outsp = None | |
| if outsp is None: | |
| continue | |
| for child_tx in outsp: | |
| if not child_tx: | |
| continue | |
| if child_tx not in idx_map: | |
| if len(nodes) >= cfg.MAX_NODES: | |
| logs.append("MAX_NODES reached during forward expansion") | |
| break | |
| ctj, cout = ensure_tx(child_tx) | |
| if ctj is None: | |
| continue | |
| cidx = add_node(child_tx, ctj) | |
| if cidx is None: | |
| continue | |
| frontier_ch.append((child_tx, depth+1)) | |
| else: | |
| cidx = idx_map[child_tx] | |
| pidx = idx_map.get(cur) | |
| if pidx is not None: | |
| edges.append((pidx, cidx)) | |
| if cfg.MAX_EDGES and len(edges) >= cfg.MAX_EDGES: | |
| logs.append("MAX_EDGES reached; stopping further edge additions") | |
| break | |
| # deduplicate edges | |
| edges = list(set(edges)) | |
| return nodes, edges, center_idx, node_meta, logs | |