Spaces:
Sleeping
Sleeping
| import os | |
| import json | |
| import time | |
| from typing import Dict, Any, List, Optional, Tuple | |
| import requests | |
| from tenacity import retry, stop_after_attempt, wait_exponential, retry_if_exception_type | |
| from cachetools import TTLCache | |
| from config import AppConfig | |
| UserAgent = "HF-Space-BTC-Abuse-GNN/1.0 (+https://huggingface.co/spaces)" | |
| class ExplorerError(Exception): | |
| pass | |
| def _req_json(url: str, timeout: int, retries: int = 2) -> Any: | |
| def _do(): | |
| r = requests.get(url, timeout=timeout, headers={"User-Agent": UserAgent}) | |
| if r.status_code != 200: | |
| raise ExplorerError(f"HTTP {r.status_code} for {url}") | |
| return r.json() | |
| return _do() | |
| def _satoshis_to_btc(v: Optional[int]) -> float: | |
| try: | |
| return float(v) / 1e8 if v is not None else 0.0 | |
| except Exception: | |
| return 0.0 | |
| def _normalize_tx_esplora(j: Dict[str, Any]) -> Dict[str, Any]: | |
| # https://mempool.space/api/tx/{txid} | |
| vin = j.get("vin", []) | |
| vout = j.get("vout", []) | |
| status = j.get("status", {}) or {} | |
| bh = status.get("block_height") | |
| bt = status.get("block_time") | |
| vin_list = [] | |
| for e in vin: | |
| p = e.get("prevout") or {} | |
| vin_list.append({ | |
| "txid": e.get("txid"), | |
| "vout": e.get("vout"), | |
| "prevout_value": p.get("value"), | |
| "prevout_address": p.get("scriptpubkey_address") or None | |
| }) | |
| vout_list = [] | |
| for idx, e in enumerate(vout): | |
| vout_list.append({ | |
| "n": idx, | |
| "value": e.get("value"), | |
| "address": e.get("scriptpubkey_address") or None | |
| }) | |
| return { | |
| "txid": j.get("txid") or j.get("hash"), | |
| "vin": vin_list, | |
| "vout": vout_list, | |
| "block_height": bh, | |
| "block_time": bt, | |
| } | |
| def _normalize_outspends_esplora(j: Any) -> List[Optional[str]]: | |
| # returns list aligned to outputs: each item has 'spent', 'txid' | |
| res = [] | |
| if isinstance(j, list): | |
| for e in j: | |
| if isinstance(e, dict) and e.get("spent"): | |
| res.append(e.get("txid")) | |
| else: | |
| res.append(None) | |
| return res | |
| class BaseExplorer: | |
| def __init__(self, cfg: AppConfig): | |
| self.cfg = cfg | |
| self.cache_tx = TTLCache(maxsize=10000, ttl=300) | |
| self.cache_out = TTLCache(maxsize=10000, ttl=300) | |
| def get_tx(self, txid: str) -> Dict[str, Any]: | |
| raise NotImplementedError | |
| def get_outspends(self, txid: str) -> List[Optional[str]]: | |
| raise NotImplementedError | |
| class MempoolSpaceClient(BaseExplorer): | |
| def __init__(self, cfg: AppConfig, base: str = "https://mempool.space"): | |
| super().__init__(cfg) | |
| self.base = base.rstrip("/") | |
| def get_tx(self, txid: str) -> Dict[str, Any]: | |
| if txid in self.cache_tx: | |
| return self.cache_tx[txid] | |
| url = f"{self.base}/api/tx/{txid}" | |
| j = _req_json(url, timeout=self.cfg.HTTP_TIMEOUT, retries=self.cfg.HTTP_RETRIES) | |
| tx = _normalize_tx_esplora(j) | |
| self.cache_tx[txid] = tx | |
| return tx | |
| def get_outspends(self, txid: str) -> List[Optional[str]]: | |
| if txid in self.cache_out: | |
| return self.cache_out[txid] | |
| url = f"{self.base}/api/tx/{txid}/outspends" | |
| j = _req_json(url, timeout=self.cfg.HTTP_TIMEOUT, retries=self.cfg.HTTP_RETRIES) | |
| out = _normalize_outspends_esplora(j) | |
| self.cache_out[txid] = out | |
| return out | |
| class BlockstreamClient(MempoolSpaceClient): | |
| def __init__(self, cfg: AppConfig): | |
| super().__init__(cfg, base="https://blockstream.info") | |
| class BlockchairClient(BaseExplorer): | |
| def __init__(self, cfg: AppConfig): | |
| super().__init__(cfg) | |
| self.base = "https://api.blockchair.com/bitcoin" | |
| def get_tx(self, txid: str) -> Dict[str, Any]: | |
| if txid in self.cache_tx: | |
| return self.cache_tx[txid] | |
| url = f"{self.base}/dashboards/transaction/{txid}" | |
| if self.cfg.BLOCKCHAIR_API_KEY: | |
| url += f"?key={self.cfg.BLOCKCHAIR_API_KEY}" | |
| j = _req_json(url, timeout=self.cfg.HTTP_TIMEOUT, retries=self.cfg.HTTP_RETRIES) | |
| data = j.get("data", {}).get(txid, {}) | |
| tx = data.get("transaction", {}) | |
| inputs = data.get("inputs", []) | |
| outputs = data.get("outputs", []) | |
| vin_list = [{ | |
| "txid": i.get("spending_transaction_hash") or i.get("recipient_transaction_hash"), | |
| "vout": i.get("spending_index"), | |
| "prevout_value": i.get("value"), | |
| "prevout_address": i.get("recipient"), | |
| } for i in inputs] | |
| vout_list = [{ | |
| "n": o.get("index"), | |
| "value": o.get("value"), | |
| "address": o.get("recipient"), | |
| } for o in outputs] | |
| out = { | |
| "txid": txid, | |
| "vin": vin_list, | |
| "vout": vout_list, | |
| "block_height": tx.get("block_id"), | |
| "block_time": tx.get("time"), | |
| } | |
| self.cache_tx[txid] = out | |
| return out | |
| def get_outspends(self, txid: str) -> List[Optional[str]]: | |
| # Blockchair includes outputs with 'spent_by_transaction_hash' | |
| if txid in self.cache_out: | |
| return self.cache_out[txid] | |
| url = f"{self.base}/dashboards/transaction/{txid}" | |
| if self.cfg.BLOCKCHAIR_API_KEY: | |
| url += f"?key={self.cfg.BLOCKCHAIR_API_KEY}" | |
| j = _req_json(url, timeout=self.cfg.HTTP_TIMEOUT, retries=self.cfg.HTTP_RETRIES) | |
| outputs = j.get("data", {}).get(txid, {}).get("outputs", []) | |
| res = [] | |
| for o in outputs: | |
| res.append(o.get("spent_by_transaction_hash")) | |
| self.cache_out[txid] = res | |
| return res | |
| def new_client(cfg: AppConfig, primary: str) -> List[BaseExplorer]: | |
| # primary then fallbacks | |
| primary = (primary or cfg.DATA_PROVIDER).lower() | |
| chain = [] | |
| if primary in ("mempool", "mempool.space"): | |
| chain = [MempoolSpaceClient(cfg), BlockstreamClient(cfg), BlockchairClient(cfg)] | |
| elif primary in ("blockstream", "blockstream.info"): | |
| chain = [BlockstreamClient(cfg), MempoolSpaceClient(cfg), BlockchairClient(cfg)] | |
| elif primary == "blockchair": | |
| chain = [BlockchairClient(cfg), MempoolSpaceClient(cfg), BlockstreamClient(cfg)] | |
| else: | |
| chain = [MempoolSpaceClient(cfg), BlockstreamClient(cfg), BlockchairClient(cfg)] | |
| return chain | |
| def fetch_with_fallback(txid: str, cfg: AppConfig, source: str): | |
| errors = [] | |
| for c in new_client(cfg, source): | |
| try: | |
| tx = c.get_tx(txid) | |
| outspends = c.get_outspends(txid) | |
| if tx and outspends is not None: | |
| return c, tx, outspends, None | |
| except Exception as e: | |
| errors.append(f"{c.__class__.__name__}: {e}") | |
| continue | |
| return None, None, None, errors | |