MLGraph-Bitcoin-GAD / explorers.py
thanhphxu's picture
Upload folder using huggingface_hub
db886e4 verified
import os
import json
import time
from typing import Dict, Any, List, Optional, Tuple
import requests
from tenacity import retry, stop_after_attempt, wait_exponential, retry_if_exception_type
from cachetools import TTLCache
from config import AppConfig
UserAgent = "HF-Space-BTC-Abuse-GNN/1.0 (+https://huggingface.co/spaces)"
class ExplorerError(Exception):
pass
def _req_json(url: str, timeout: int, retries: int = 2) -> Any:
@retry(stop=stop_after_attempt(retries), wait=wait_exponential(min=0.5, max=4),
retry=retry_if_exception_type((requests.Timeout, requests.ConnectionError)))
def _do():
r = requests.get(url, timeout=timeout, headers={"User-Agent": UserAgent})
if r.status_code != 200:
raise ExplorerError(f"HTTP {r.status_code} for {url}")
return r.json()
return _do()
def _satoshis_to_btc(v: Optional[int]) -> float:
try:
return float(v) / 1e8 if v is not None else 0.0
except Exception:
return 0.0
def _normalize_tx_esplora(j: Dict[str, Any]) -> Dict[str, Any]:
# https://mempool.space/api/tx/{txid}
vin = j.get("vin", [])
vout = j.get("vout", [])
status = j.get("status", {}) or {}
bh = status.get("block_height")
bt = status.get("block_time")
vin_list = []
for e in vin:
p = e.get("prevout") or {}
vin_list.append({
"txid": e.get("txid"),
"vout": e.get("vout"),
"prevout_value": p.get("value"),
"prevout_address": p.get("scriptpubkey_address") or None
})
vout_list = []
for idx, e in enumerate(vout):
vout_list.append({
"n": idx,
"value": e.get("value"),
"address": e.get("scriptpubkey_address") or None
})
return {
"txid": j.get("txid") or j.get("hash"),
"vin": vin_list,
"vout": vout_list,
"block_height": bh,
"block_time": bt,
}
def _normalize_outspends_esplora(j: Any) -> List[Optional[str]]:
# returns list aligned to outputs: each item has 'spent', 'txid'
res = []
if isinstance(j, list):
for e in j:
if isinstance(e, dict) and e.get("spent"):
res.append(e.get("txid"))
else:
res.append(None)
return res
class BaseExplorer:
def __init__(self, cfg: AppConfig):
self.cfg = cfg
self.cache_tx = TTLCache(maxsize=10000, ttl=300)
self.cache_out = TTLCache(maxsize=10000, ttl=300)
def get_tx(self, txid: str) -> Dict[str, Any]:
raise NotImplementedError
def get_outspends(self, txid: str) -> List[Optional[str]]:
raise NotImplementedError
class MempoolSpaceClient(BaseExplorer):
def __init__(self, cfg: AppConfig, base: str = "https://mempool.space"):
super().__init__(cfg)
self.base = base.rstrip("/")
def get_tx(self, txid: str) -> Dict[str, Any]:
if txid in self.cache_tx:
return self.cache_tx[txid]
url = f"{self.base}/api/tx/{txid}"
j = _req_json(url, timeout=self.cfg.HTTP_TIMEOUT, retries=self.cfg.HTTP_RETRIES)
tx = _normalize_tx_esplora(j)
self.cache_tx[txid] = tx
return tx
def get_outspends(self, txid: str) -> List[Optional[str]]:
if txid in self.cache_out:
return self.cache_out[txid]
url = f"{self.base}/api/tx/{txid}/outspends"
j = _req_json(url, timeout=self.cfg.HTTP_TIMEOUT, retries=self.cfg.HTTP_RETRIES)
out = _normalize_outspends_esplora(j)
self.cache_out[txid] = out
return out
class BlockstreamClient(MempoolSpaceClient):
def __init__(self, cfg: AppConfig):
super().__init__(cfg, base="https://blockstream.info")
class BlockchairClient(BaseExplorer):
def __init__(self, cfg: AppConfig):
super().__init__(cfg)
self.base = "https://api.blockchair.com/bitcoin"
def get_tx(self, txid: str) -> Dict[str, Any]:
if txid in self.cache_tx:
return self.cache_tx[txid]
url = f"{self.base}/dashboards/transaction/{txid}"
if self.cfg.BLOCKCHAIR_API_KEY:
url += f"?key={self.cfg.BLOCKCHAIR_API_KEY}"
j = _req_json(url, timeout=self.cfg.HTTP_TIMEOUT, retries=self.cfg.HTTP_RETRIES)
data = j.get("data", {}).get(txid, {})
tx = data.get("transaction", {})
inputs = data.get("inputs", [])
outputs = data.get("outputs", [])
vin_list = [{
"txid": i.get("spending_transaction_hash") or i.get("recipient_transaction_hash"),
"vout": i.get("spending_index"),
"prevout_value": i.get("value"),
"prevout_address": i.get("recipient"),
} for i in inputs]
vout_list = [{
"n": o.get("index"),
"value": o.get("value"),
"address": o.get("recipient"),
} for o in outputs]
out = {
"txid": txid,
"vin": vin_list,
"vout": vout_list,
"block_height": tx.get("block_id"),
"block_time": tx.get("time"),
}
self.cache_tx[txid] = out
return out
def get_outspends(self, txid: str) -> List[Optional[str]]:
# Blockchair includes outputs with 'spent_by_transaction_hash'
if txid in self.cache_out:
return self.cache_out[txid]
url = f"{self.base}/dashboards/transaction/{txid}"
if self.cfg.BLOCKCHAIR_API_KEY:
url += f"?key={self.cfg.BLOCKCHAIR_API_KEY}"
j = _req_json(url, timeout=self.cfg.HTTP_TIMEOUT, retries=self.cfg.HTTP_RETRIES)
outputs = j.get("data", {}).get(txid, {}).get("outputs", [])
res = []
for o in outputs:
res.append(o.get("spent_by_transaction_hash"))
self.cache_out[txid] = res
return res
def new_client(cfg: AppConfig, primary: str) -> List[BaseExplorer]:
# primary then fallbacks
primary = (primary or cfg.DATA_PROVIDER).lower()
chain = []
if primary in ("mempool", "mempool.space"):
chain = [MempoolSpaceClient(cfg), BlockstreamClient(cfg), BlockchairClient(cfg)]
elif primary in ("blockstream", "blockstream.info"):
chain = [BlockstreamClient(cfg), MempoolSpaceClient(cfg), BlockchairClient(cfg)]
elif primary == "blockchair":
chain = [BlockchairClient(cfg), MempoolSpaceClient(cfg), BlockstreamClient(cfg)]
else:
chain = [MempoolSpaceClient(cfg), BlockstreamClient(cfg), BlockchairClient(cfg)]
return chain
def fetch_with_fallback(txid: str, cfg: AppConfig, source: str):
errors = []
for c in new_client(cfg, source):
try:
tx = c.get_tx(txid)
outspends = c.get_outspends(txid)
if tx and outspends is not None:
return c, tx, outspends, None
except Exception as e:
errors.append(f"{c.__class__.__name__}: {e}")
continue
return None, None, None, errors