|
|
import asyncio |
|
|
import os |
|
|
from pathlib import Path |
|
|
from base64 import b64decode |
|
|
from json import load, dumps, loads |
|
|
from logging import getLogger |
|
|
from traceback import print_exc |
|
|
from typing import Optional |
|
|
|
|
|
from substrateinterface import Keypair |
|
|
from bittensor import wallet, async_subtensor |
|
|
|
|
|
from scorevision.utils.settings import get_settings |
|
|
from scorevision.utils.huggingface_helpers import get_huggingface_repo_name |
|
|
|
|
|
logger = getLogger(__name__) |
|
|
|
|
|
_SUBTENSOR = None |
|
|
|
|
|
|
|
|
def _coerce_last_update_value(value) -> Optional[int]: |
|
|
"""Convert last_update values from numpy / torch / python scalars into int.""" |
|
|
if value is None: |
|
|
return None |
|
|
try: |
|
|
if hasattr(value, "item"): |
|
|
value = value.item() |
|
|
except Exception: |
|
|
pass |
|
|
try: |
|
|
return int(value) |
|
|
except (TypeError, ValueError): |
|
|
return None |
|
|
|
|
|
|
|
|
def get_last_update_for_hotkey( |
|
|
meta, hotkey: str, pubkey_hex: str | None = None |
|
|
) -> Optional[int]: |
|
|
""" |
|
|
Return the validator's last_update height regardless of the underlying container type. |
|
|
""" |
|
|
if meta is None or not hotkey: |
|
|
return None |
|
|
|
|
|
last_update = getattr(meta, "last_update", None) |
|
|
if last_update is None: |
|
|
return None |
|
|
|
|
|
candidate_keys: list[str] = [] |
|
|
if hotkey: |
|
|
candidate_keys.append(hotkey) |
|
|
if pubkey_hex: |
|
|
variants = { |
|
|
pubkey_hex, |
|
|
pubkey_hex.lower(), |
|
|
pubkey_hex.upper(), |
|
|
} |
|
|
candidate_keys.extend([v for v in variants if v]) |
|
|
candidate_keys.extend([f"0x{v}" for v in variants if v]) |
|
|
seen: set[str] = set() |
|
|
if hasattr(last_update, "get"): |
|
|
for key in candidate_keys: |
|
|
if not key: |
|
|
continue |
|
|
if key in seen: |
|
|
continue |
|
|
seen.add(key) |
|
|
try: |
|
|
value = last_update.get(key) |
|
|
except Exception: |
|
|
value = None |
|
|
coerced = _coerce_last_update_value(value) |
|
|
if coerced is not None: |
|
|
return coerced |
|
|
|
|
|
index: Optional[int] = None |
|
|
hotkeys = getattr(meta, "hotkeys", None) |
|
|
hotkeys_list: Optional[list[str]] = None |
|
|
if hotkeys is not None: |
|
|
try: |
|
|
hotkeys_list = list(hotkeys) |
|
|
except TypeError: |
|
|
hotkeys_list = None |
|
|
|
|
|
if hotkeys_list: |
|
|
try: |
|
|
index = hotkeys_list.index(hotkey) |
|
|
except ValueError: |
|
|
index = None |
|
|
|
|
|
if index is None and hotkeys_list: |
|
|
for idx, hk in enumerate(hotkeys_list): |
|
|
if hk == hotkey: |
|
|
index = idx |
|
|
break |
|
|
|
|
|
if index is None: |
|
|
return None |
|
|
|
|
|
try: |
|
|
value = last_update[index] |
|
|
except Exception: |
|
|
return None |
|
|
return _coerce_last_update_value(value) |
|
|
|
|
|
|
|
|
def load_hotkey_keypair(wallet_name: str, hotkey_name: str) -> Keypair: |
|
|
settings = get_settings() |
|
|
|
|
|
wallet_dir = Path(settings.BITTENSOR_WALLET_PATH).expanduser() |
|
|
wallet_dir_str = str(wallet_dir).strip() |
|
|
if not wallet_dir_str or wallet_dir_str == ".": |
|
|
wallet_dir = Path.home() / ".bittensor" / "wallets" |
|
|
|
|
|
wallet_name = wallet_name or settings.BITTENSOR_WALLET_COLD |
|
|
hotkey_name = hotkey_name or settings.BITTENSOR_WALLET_HOT |
|
|
|
|
|
hotkey_dir = wallet_dir / wallet_name / "hotkeys" |
|
|
file_path = hotkey_dir / hotkey_name |
|
|
try: |
|
|
with open(file_path, "r") as file: |
|
|
keypair_data = load(file) |
|
|
seed = keypair_data["secretSeed"] |
|
|
keypair = Keypair.create_from_seed(seed) |
|
|
logger.info(f"Loaded keypair from {file_path}") |
|
|
return keypair |
|
|
except Exception as e: |
|
|
raise ValueError(f"Failed to load keypair: {e} (path={file_path})") |
|
|
|
|
|
|
|
|
async def get_subtensor(): |
|
|
global _SUBTENSOR |
|
|
settings = get_settings() |
|
|
|
|
|
endpoint = settings.BITTENSOR_SUBTENSOR_ENDPOINT |
|
|
init_timeout = float(os.getenv("SUBTENSOR_INIT_TIMEOUT_S", "15.0")) |
|
|
|
|
|
async def _init(ep: str): |
|
|
st = async_subtensor(ep) |
|
|
await asyncio.wait_for(st.initialize(), timeout=init_timeout) |
|
|
return st |
|
|
|
|
|
if _SUBTENSOR is None: |
|
|
try: |
|
|
logger.info("Initializing subtensor on %s", endpoint) |
|
|
_SUBTENSOR = await _init(endpoint) |
|
|
except Exception as e: |
|
|
logger.error("Subtensor init failed for %s: %s", endpoint, e) |
|
|
raise |
|
|
return _SUBTENSOR |
|
|
|
|
|
|
|
|
def reset_subtensor(): |
|
|
"""Clear cached subtensor client so next access reinitializes connection.""" |
|
|
global _SUBTENSOR |
|
|
_SUBTENSOR = None |
|
|
|
|
|
|
|
|
async def on_chain_commit( |
|
|
skip: bool, revision: str, chute_id: str, chute_slug: str | None |
|
|
) -> None: |
|
|
settings = get_settings() |
|
|
repo_name = get_huggingface_repo_name() |
|
|
w = wallet( |
|
|
name=settings.BITTENSOR_WALLET_COLD, |
|
|
hotkey=settings.BITTENSOR_WALLET_HOT, |
|
|
) |
|
|
payload = { |
|
|
"model": repo_name, |
|
|
"revision": revision, |
|
|
"chute_id": chute_id, |
|
|
"slug": chute_slug, |
|
|
"hotkey": w.hotkey.ss58_address, |
|
|
} |
|
|
logger.info(f"Commit payload: {payload}") |
|
|
try: |
|
|
if skip: |
|
|
raise Exception( |
|
|
f"No chute_id/slug; skipping on-chain commit for now. Payload would be: {payload}" |
|
|
) |
|
|
|
|
|
sub = await get_subtensor() |
|
|
|
|
|
await sub.set_reveal_commitment( |
|
|
wallet=w, |
|
|
netuid=settings.SCOREVISION_NETUID, |
|
|
data=dumps(payload), |
|
|
blocks_until_reveal=1, |
|
|
) |
|
|
logger.info("On-chain commitment submitted.") |
|
|
except Exception as e: |
|
|
logger.error(f"(Dry-run) On-chain commit skipped: {type(e).__name__}: {e}") |
|
|
|
|
|
|
|
|
async def _set_weights_with_confirmation( |
|
|
wallet, |
|
|
netuid: int, |
|
|
mechid: int | None, |
|
|
uids: list[int], |
|
|
weights: list[float], |
|
|
wait_for_inclusion: bool = False, |
|
|
retries: int = 10, |
|
|
delay_s: float = 2.0, |
|
|
log_prefix: str = "[sv-local]", |
|
|
) -> bool: |
|
|
import bittensor as bt |
|
|
|
|
|
settings = get_settings() |
|
|
confirm_blocks = max(1, int(os.getenv("SIGNER_CONFIRM_BLOCKS", "3"))) |
|
|
|
|
|
for attempt in range(retries): |
|
|
try: |
|
|
st = await get_subtensor() |
|
|
ref = await st.get_current_block() |
|
|
|
|
|
success, message = bt.subtensor( |
|
|
os.getenv("BITTENSOR_SUBTENSOR_ENDPOINT", "finney") |
|
|
).set_weights( |
|
|
wallet=wallet, |
|
|
netuid=netuid, |
|
|
mechid=mechid if mechid is not None else settings.SCOREVISION_MECHID, |
|
|
uids=uids, |
|
|
weights=weights, |
|
|
wait_for_inclusion=wait_for_inclusion, |
|
|
) |
|
|
if not success: |
|
|
logger.warning( |
|
|
f"{log_prefix} extrinsic submit failed: {message or 'unknown error'}" |
|
|
) |
|
|
else: |
|
|
logger.info( |
|
|
f"{log_prefix} extrinsic submitted; monitoring up to {confirm_blocks} block(s) … (ref {ref}, msg={message or ''})" |
|
|
) |
|
|
latest_lu = None |
|
|
target_mechid = ( |
|
|
mechid if mechid is not None else settings.SCOREVISION_MECHID |
|
|
) |
|
|
hotkey = wallet.hotkey.ss58_address |
|
|
for wait_idx in range(confirm_blocks): |
|
|
await st.wait_for_block() |
|
|
meta = await st.metagraph(netuid, mechid=target_mechid) |
|
|
try: |
|
|
meta_hotkeys = getattr(meta, "hotkeys", []) or [] |
|
|
try: |
|
|
hotkey_present = hotkey in meta_hotkeys |
|
|
except TypeError: |
|
|
try: |
|
|
hotkey_present = hotkey in list(meta_hotkeys) |
|
|
except TypeError: |
|
|
hotkey_present = False |
|
|
if not hotkey_present: |
|
|
logger.warning( |
|
|
f"{log_prefix} wallet hotkey not found in metagraph; retry…" |
|
|
) |
|
|
break |
|
|
|
|
|
latest_lu = get_last_update_for_hotkey( |
|
|
meta, hotkey, pubkey_hex=wallet.hotkey.public_key.hex() |
|
|
) |
|
|
if latest_lu is None: |
|
|
logger.warning( |
|
|
f"{log_prefix} wallet hotkey found but no last_update entry; retry…" |
|
|
) |
|
|
break |
|
|
if latest_lu >= ref: |
|
|
logger.info( |
|
|
f"{log_prefix} confirmation OK (last_update {latest_lu} >= ref {ref} after {wait_idx + 1} block(s))" |
|
|
) |
|
|
return True |
|
|
logger.debug( |
|
|
f"{log_prefix} waiting for inclusion… (last_update {latest_lu} < ref {ref}, waited {wait_idx + 1}/{confirm_blocks} block(s))" |
|
|
) |
|
|
finally: |
|
|
|
|
|
del meta |
|
|
if latest_lu is not None: |
|
|
logger.warning( |
|
|
f"{log_prefix} not included after {confirm_blocks} block(s) (last_update {latest_lu} < ref {ref}), retry…" |
|
|
) |
|
|
else: |
|
|
logger.warning( |
|
|
f"{log_prefix} not included after {confirm_blocks} block(s) (hotkey missing), retry…" |
|
|
) |
|
|
except Exception as e: |
|
|
logger.warning( |
|
|
f"{log_prefix} attempt {attempt+1}/{retries} error: {type(e).__name__}: {e}" |
|
|
) |
|
|
await asyncio.sleep(delay_s) |
|
|
return False |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
async def on_chain_commit_validator(index_url: str) -> None: |
|
|
""" """ |
|
|
settings = get_settings() |
|
|
w = wallet( |
|
|
name=settings.BITTENSOR_WALLET_COLD, |
|
|
hotkey=settings.BITTENSOR_WALLET_HOT, |
|
|
) |
|
|
payload = { |
|
|
"role": "validator", |
|
|
"hotkey": w.hotkey.ss58_address, |
|
|
"index_url": index_url, |
|
|
"chute_name": settings.CHUTES_USERNAME, |
|
|
"version": 1, |
|
|
} |
|
|
logger.info(f"[validator-commit] {payload}") |
|
|
try: |
|
|
sub = await get_subtensor() |
|
|
await sub.set_reveal_commitment( |
|
|
wallet=w, |
|
|
netuid=settings.SCOREVISION_NETUID, |
|
|
data=dumps(payload), |
|
|
blocks_until_reveal=1, |
|
|
) |
|
|
logger.info("[validator-commit] On-chain commitment submitted.") |
|
|
except Exception as e: |
|
|
logger.error(f"[validator-commit] failed: {type(e).__name__}: {e}") |
|
|
|
|
|
|
|
|
async def get_validator_indexes_from_chain(netuid: int | None = None) -> dict[str, str]: |
|
|
""" """ |
|
|
settings = get_settings() |
|
|
netuid = netuid if netuid is not None else settings.SCOREVISION_NETUID |
|
|
st = await get_subtensor() |
|
|
meta = await st.metagraph(netuid, mechid=settings.SCOREVISION_MECHID) |
|
|
commits = await st.get_all_revealed_commitments(netuid) |
|
|
|
|
|
stake_tensor = getattr(meta, "S", None) |
|
|
if stake_tensor is None: |
|
|
stake_tensor = getattr(meta, "stake", None) |
|
|
|
|
|
result: dict[str, str] = {} |
|
|
for i, hk in enumerate(meta.hotkeys): |
|
|
arr = commits.get(hk) |
|
|
if not arr: |
|
|
continue |
|
|
_, data = arr[-1] |
|
|
try: |
|
|
obj = loads(data) |
|
|
except Exception: |
|
|
continue |
|
|
if not isinstance(obj, dict): |
|
|
continue |
|
|
if obj.get("role") == "validator": |
|
|
stake_val = 0.0 |
|
|
try: |
|
|
val = stake_tensor[i] |
|
|
if hasattr(val, "item"): |
|
|
val = val.item() |
|
|
stake_val = float(val) |
|
|
except Exception: |
|
|
pass |
|
|
|
|
|
if stake_val >= 1000.0: |
|
|
url = obj.get("index_url") |
|
|
if isinstance(url, str) and url.startswith("http"): |
|
|
result[hk] = url |
|
|
|
|
|
return result |
|
|
|
|
|
|
|
|
async def _already_committed_same_index(netuid: int, index_url: str) -> bool: |
|
|
""" """ |
|
|
settings = get_settings() |
|
|
st = await get_subtensor() |
|
|
meta = await st.metagraph(netuid, mechid=settings.SCOREVISION_MECHID) |
|
|
commits = await st.get_all_revealed_commitments(netuid) |
|
|
|
|
|
w = wallet( |
|
|
name=settings.BITTENSOR_WALLET_COLD, |
|
|
hotkey=settings.BITTENSOR_WALLET_HOT, |
|
|
) |
|
|
hk = w.hotkey.ss58_address |
|
|
arr = commits.get(hk) |
|
|
if not arr: |
|
|
return False |
|
|
try: |
|
|
_, data = arr[-1] |
|
|
obj = loads(data) |
|
|
except Exception: |
|
|
return False |
|
|
return ( |
|
|
isinstance(obj, dict) |
|
|
and obj.get("role") == "validator" |
|
|
and str(obj.get("index_url")) == str(index_url) |
|
|
) |
|
|
|
|
|
|
|
|
async def _first_commit_block_by_miner( |
|
|
netuid: int, |
|
|
*, |
|
|
retries: int = 2, |
|
|
) -> dict[str, int]: |
|
|
"""""" |
|
|
attempt = 0 |
|
|
while True: |
|
|
try: |
|
|
st = await get_subtensor() |
|
|
settings = get_settings() |
|
|
|
|
|
meta = await st.metagraph(netuid, mechid=settings.SCOREVISION_MECHID) |
|
|
commits = await st.get_all_revealed_commitments(netuid) |
|
|
|
|
|
last_block_by_hk: dict[str, int] = {} |
|
|
for hk in meta.hotkeys: |
|
|
arr = commits.get(hk) |
|
|
if not arr: |
|
|
continue |
|
|
|
|
|
last_block = None |
|
|
for tup in arr: |
|
|
try: |
|
|
blk, data = tup |
|
|
except Exception: |
|
|
continue |
|
|
|
|
|
try: |
|
|
obj = loads(data) |
|
|
except Exception: |
|
|
continue |
|
|
|
|
|
if isinstance(obj, dict) and obj.get("role") == "validator": |
|
|
continue |
|
|
|
|
|
try: |
|
|
blk_int = int(blk) |
|
|
except Exception: |
|
|
continue |
|
|
|
|
|
if last_block is None or blk_int > last_block: |
|
|
last_block = blk_int |
|
|
|
|
|
if last_block is not None: |
|
|
last_block_by_hk[hk] = last_block |
|
|
|
|
|
return last_block_by_hk |
|
|
|
|
|
except Exception as e: |
|
|
attempt += 1 |
|
|
logger.warning( |
|
|
"[first_commit_block_by_miner] error on attempt %d/%d: %s: %s — resetting subtensor", |
|
|
attempt, |
|
|
retries, |
|
|
type(e).__name__, |
|
|
e, |
|
|
) |
|
|
reset_subtensor() |
|
|
|
|
|
if attempt > retries: |
|
|
raise |
|
|
|
|
|
await asyncio.sleep(1.0) |
|
|
|
|
|
|
|
|
async def _wait_n_blocks(n: int, timeout_per_block: float = 30.0) -> None: |
|
|
"""Wait for n new blocks on the current subtensor client.""" |
|
|
if n <= 0: |
|
|
return |
|
|
st = await get_subtensor() |
|
|
for i in range(n): |
|
|
try: |
|
|
await asyncio.wait_for(st.wait_for_block(), timeout=timeout_per_block) |
|
|
except asyncio.TimeoutError: |
|
|
logger.warning( |
|
|
"[commit-retry] wait_for_block timed out (i=%d/%d), continuing…", |
|
|
i + 1, |
|
|
n, |
|
|
) |
|
|
continue |
|
|
|
|
|
|
|
|
async def on_chain_commit_validator_retry( |
|
|
index_url: str, |
|
|
*, |
|
|
wait_blocks: int = 100, |
|
|
confirm_after: int = 3, |
|
|
max_retries: int | None = None, |
|
|
) -> bool: |
|
|
""" """ |
|
|
settings = get_settings() |
|
|
w = wallet( |
|
|
name=settings.BITTENSOR_WALLET_COLD, |
|
|
hotkey=settings.BITTENSOR_WALLET_HOT, |
|
|
) |
|
|
|
|
|
if await _already_committed_same_index(settings.SCOREVISION_NETUID, index_url): |
|
|
logger.info("[validator-commit] Already published %s; skipping.", index_url) |
|
|
return True |
|
|
|
|
|
attempt = 0 |
|
|
while True: |
|
|
attempt += 1 |
|
|
try: |
|
|
sub = await get_subtensor() |
|
|
logger.info("[validator-commit] attempt #%d submitting…", attempt) |
|
|
await sub.set_reveal_commitment( |
|
|
wallet=w, |
|
|
netuid=settings.SCOREVISION_NETUID, |
|
|
data=dumps( |
|
|
{ |
|
|
"role": "validator", |
|
|
"hotkey": w.hotkey.ss58_address, |
|
|
"index_url": index_url, |
|
|
"chute_name": settings.CHUTES_USERNAME, |
|
|
"version": 1, |
|
|
} |
|
|
), |
|
|
blocks_until_reveal=1, |
|
|
) |
|
|
logger.info( |
|
|
"[validator-commit] submitted; waiting %d block(s) for confirm check…", |
|
|
confirm_after, |
|
|
) |
|
|
await _wait_n_blocks(confirm_after) |
|
|
|
|
|
if await _already_committed_same_index( |
|
|
settings.SCOREVISION_NETUID, index_url |
|
|
): |
|
|
logger.info("[validator-commit] confirmed on-chain.") |
|
|
return True |
|
|
|
|
|
logger.warning( |
|
|
"[validator-commit] not visible yet after %d blocks; will retry after %d more blocks.", |
|
|
confirm_after, |
|
|
wait_blocks, |
|
|
) |
|
|
|
|
|
except Exception as e: |
|
|
logger.warning( |
|
|
"[validator-commit] attempt #%d failed: %s: %s", |
|
|
attempt, |
|
|
type(e).__name__, |
|
|
e, |
|
|
) |
|
|
|
|
|
if max_retries is not None and attempt >= max_retries: |
|
|
logger.error("[validator-commit] giving up after %d attempts.", attempt) |
|
|
return False |
|
|
|
|
|
await _wait_n_blocks(wait_blocks) |
|
|
|