import os import json import time from typing import Dict, Any, List, Optional, Tuple import requests from tenacity import retry, stop_after_attempt, wait_exponential, retry_if_exception_type from cachetools import TTLCache from config import AppConfig UserAgent = "HF-Space-BTC-Abuse-GNN/1.0 (+https://huggingface.co/spaces)" class ExplorerError(Exception): pass def _req_json(url: str, timeout: int, retries: int = 2) -> Any: @retry(stop=stop_after_attempt(retries), wait=wait_exponential(min=0.5, max=4), retry=retry_if_exception_type((requests.Timeout, requests.ConnectionError))) def _do(): r = requests.get(url, timeout=timeout, headers={"User-Agent": UserAgent}) if r.status_code != 200: raise ExplorerError(f"HTTP {r.status_code} for {url}") return r.json() return _do() def _satoshis_to_btc(v: Optional[int]) -> float: try: return float(v) / 1e8 if v is not None else 0.0 except Exception: return 0.0 def _normalize_tx_esplora(j: Dict[str, Any]) -> Dict[str, Any]: # https://mempool.space/api/tx/{txid} vin = j.get("vin", []) vout = j.get("vout", []) status = j.get("status", {}) or {} bh = status.get("block_height") bt = status.get("block_time") vin_list = [] for e in vin: p = e.get("prevout") or {} vin_list.append({ "txid": e.get("txid"), "vout": e.get("vout"), "prevout_value": p.get("value"), "prevout_address": p.get("scriptpubkey_address") or None }) vout_list = [] for idx, e in enumerate(vout): vout_list.append({ "n": idx, "value": e.get("value"), "address": e.get("scriptpubkey_address") or None }) return { "txid": j.get("txid") or j.get("hash"), "vin": vin_list, "vout": vout_list, "block_height": bh, "block_time": bt, } def _normalize_outspends_esplora(j: Any) -> List[Optional[str]]: # returns list aligned to outputs: each item has 'spent', 'txid' res = [] if isinstance(j, list): for e in j: if isinstance(e, dict) and e.get("spent"): res.append(e.get("txid")) else: res.append(None) return res class BaseExplorer: def __init__(self, cfg: AppConfig): self.cfg = cfg self.cache_tx = TTLCache(maxsize=10000, ttl=300) self.cache_out = TTLCache(maxsize=10000, ttl=300) def get_tx(self, txid: str) -> Dict[str, Any]: raise NotImplementedError def get_outspends(self, txid: str) -> List[Optional[str]]: raise NotImplementedError class MempoolSpaceClient(BaseExplorer): def __init__(self, cfg: AppConfig, base: str = "https://mempool.space"): super().__init__(cfg) self.base = base.rstrip("/") def get_tx(self, txid: str) -> Dict[str, Any]: if txid in self.cache_tx: return self.cache_tx[txid] url = f"{self.base}/api/tx/{txid}" j = _req_json(url, timeout=self.cfg.HTTP_TIMEOUT, retries=self.cfg.HTTP_RETRIES) tx = _normalize_tx_esplora(j) self.cache_tx[txid] = tx return tx def get_outspends(self, txid: str) -> List[Optional[str]]: if txid in self.cache_out: return self.cache_out[txid] url = f"{self.base}/api/tx/{txid}/outspends" j = _req_json(url, timeout=self.cfg.HTTP_TIMEOUT, retries=self.cfg.HTTP_RETRIES) out = _normalize_outspends_esplora(j) self.cache_out[txid] = out return out class BlockstreamClient(MempoolSpaceClient): def __init__(self, cfg: AppConfig): super().__init__(cfg, base="https://blockstream.info") class BlockchairClient(BaseExplorer): def __init__(self, cfg: AppConfig): super().__init__(cfg) self.base = "https://api.blockchair.com/bitcoin" def get_tx(self, txid: str) -> Dict[str, Any]: if txid in self.cache_tx: return self.cache_tx[txid] url = f"{self.base}/dashboards/transaction/{txid}" if self.cfg.BLOCKCHAIR_API_KEY: url += f"?key={self.cfg.BLOCKCHAIR_API_KEY}" j = _req_json(url, timeout=self.cfg.HTTP_TIMEOUT, retries=self.cfg.HTTP_RETRIES) data = j.get("data", {}).get(txid, {}) tx = data.get("transaction", {}) inputs = data.get("inputs", []) outputs = data.get("outputs", []) vin_list = [{ "txid": i.get("spending_transaction_hash") or i.get("recipient_transaction_hash"), "vout": i.get("spending_index"), "prevout_value": i.get("value"), "prevout_address": i.get("recipient"), } for i in inputs] vout_list = [{ "n": o.get("index"), "value": o.get("value"), "address": o.get("recipient"), } for o in outputs] out = { "txid": txid, "vin": vin_list, "vout": vout_list, "block_height": tx.get("block_id"), "block_time": tx.get("time"), } self.cache_tx[txid] = out return out def get_outspends(self, txid: str) -> List[Optional[str]]: # Blockchair includes outputs with 'spent_by_transaction_hash' if txid in self.cache_out: return self.cache_out[txid] url = f"{self.base}/dashboards/transaction/{txid}" if self.cfg.BLOCKCHAIR_API_KEY: url += f"?key={self.cfg.BLOCKCHAIR_API_KEY}" j = _req_json(url, timeout=self.cfg.HTTP_TIMEOUT, retries=self.cfg.HTTP_RETRIES) outputs = j.get("data", {}).get(txid, {}).get("outputs", []) res = [] for o in outputs: res.append(o.get("spent_by_transaction_hash")) self.cache_out[txid] = res return res def new_client(cfg: AppConfig, primary: str) -> List[BaseExplorer]: # primary then fallbacks primary = (primary or cfg.DATA_PROVIDER).lower() chain = [] if primary in ("mempool", "mempool.space"): chain = [MempoolSpaceClient(cfg), BlockstreamClient(cfg), BlockchairClient(cfg)] elif primary in ("blockstream", "blockstream.info"): chain = [BlockstreamClient(cfg), MempoolSpaceClient(cfg), BlockchairClient(cfg)] elif primary == "blockchair": chain = [BlockchairClient(cfg), MempoolSpaceClient(cfg), BlockstreamClient(cfg)] else: chain = [MempoolSpaceClient(cfg), BlockstreamClient(cfg), BlockchairClient(cfg)] return chain def fetch_with_fallback(txid: str, cfg: AppConfig, source: str): errors = [] for c in new_client(cfg, source): try: tx = c.get_tx(txid) outspends = c.get_outspends(txid) if tx and outspends is not None: return c, tx, outspends, None except Exception as e: errors.append(f"{c.__class__.__name__}: {e}") continue return None, None, None, errors