Spaces:
Sleeping
Sleeping
| import os | |
| import traceback | |
| from datetime import datetime, timedelta | |
| from enum import Enum | |
| from threading import Lock | |
| from typing import TypeAlias | |
| import requests | |
| from cachetools import TTLCache, cached | |
| from fiber import constants | |
| from fiber.chain.interface import get_substrate | |
| from fiber.chain.metagraph import Metagraph | |
| from pydantic import BaseModel | |
| from substrateinterface import SubstrateInterface | |
| from substrateinterface.storage import StorageKey | |
| from network_commitments import Decoder | |
| from src import Key, Uid, TIMEZONE | |
| TAOSTATS_API_KEY = os.getenv("TAOSTATS_API_KEY") | |
| Weight: TypeAlias = float | |
| Incentive: TypeAlias = float | |
| class ContestId(Enum): | |
| FLUX_NVIDIA_4090 = 0 | |
| SDXL_NEWDREAM_NVIDIA_4090 = 1 | |
| class Neuron(BaseModel): | |
| hotkey: str | |
| coldkey: str | |
| validator_trust: float | |
| validator_permit: bool | |
| incentive: float | |
| updated: int | |
| uid: int | |
| block_number: int | |
| class Commitment(BaseModel): | |
| url: str | |
| revision: str | |
| contest: ContestId | |
| block: int | |
| def decode(cls, decoder: Decoder, block: int): | |
| provider = decoder.read_str() | |
| repository = decoder.read_str() | |
| revision = decoder.read_sized_str(7) | |
| contest_id = ContestId(decoder.read_uint16()) | |
| return cls( | |
| url=f"https://{provider}/{repository}", | |
| revision=revision, | |
| contest=contest_id, | |
| block=block | |
| ) | |
| SPEC_VERSION = 8 | |
| NET_UID = 39 | |
| WEIGHTS_BY_MINER: dict[Key, list[tuple[Key, Weight]]] = {} | |
| VALIDATOR_IDENTITIES: dict[Key, str] = {} | |
| HOTKEYS_BY_UID: dict[Uid, Key] = {} | |
| substrate = get_substrate(subtensor_address=constants.FINNEY_SUBTENSOR_ADDRESS) | |
| def query_subtensor(storage_keys: list[StorageKey], block: int) -> list: | |
| global substrate | |
| try: | |
| return substrate.query_multi( | |
| storage_keys=storage_keys, | |
| block_hash=substrate.get_block_hash(block), | |
| ) | |
| except Exception: | |
| substrate = get_substrate(subtensor_address=substrate.url) | |
| raise | |
| def fetch_weights(block: int): | |
| storage_keys: list[StorageKey] = [] | |
| for hotkey, neuron in get_neurons().items(): | |
| if not neuron.validator_permit: continue | |
| storage_keys.append(substrate.create_storage_key( | |
| "SubtensorModule", | |
| "Weights", | |
| [NET_UID, neuron.uid] | |
| )) | |
| weights = query_subtensor(storage_keys, block) | |
| WEIGHTS_BY_MINER.clear() | |
| for hotkey, neuron in get_neurons().items(): | |
| for storage, validator_weights in weights: | |
| if not validator_weights: | |
| continue | |
| validator_hotkey = HOTKEYS_BY_UID[storage.params[1]] | |
| if hotkey not in WEIGHTS_BY_MINER: | |
| WEIGHTS_BY_MINER[hotkey] = [] | |
| weight = 0.0 | |
| for miner_weight in validator_weights: | |
| if miner_weight[0].value == neuron.uid: | |
| weight = miner_weight[1].value / 2 ** 16 | |
| break | |
| WEIGHTS_BY_MINER[hotkey].append((validator_hotkey, weight)) | |
| def fetch_identities(block: int): | |
| VALIDATOR_IDENTITIES.clear() | |
| storage_keys: list[StorageKey] = [] | |
| for hotkey, neuron in get_neurons().items(): | |
| if not neuron.validator_permit: continue | |
| storage_keys.append(substrate.create_storage_key( | |
| "SubtensorModule", | |
| "Identities", | |
| [neuron.coldkey] | |
| )) | |
| identities = query_subtensor(storage_keys, block) | |
| for hotkey, neuron in get_neurons().items(): | |
| for storage, info in identities: | |
| if neuron.coldkey != storage.params[0]: continue | |
| if info != None: # noqa | |
| VALIDATOR_IDENTITIES[hotkey] = info.value["name"] | |
| break | |
| lock = Lock() | |
| def fetch_commitments() -> dict[Key, Commitment]: | |
| substrate = get_substrate(subtensor_address=constants.FINNEY_SUBTENSOR_ADDRESS) | |
| block = substrate.get_block_number(None) # type: ignore | |
| metagraph = Metagraph(substrate=substrate, netuid=NET_UID, load_old_nodes=False) | |
| metagraph.sync_nodes() | |
| nodes = { | |
| key: node for key, node | |
| in metagraph.nodes.items() | |
| } | |
| print("Fetching commitments...") | |
| commitments: dict[Key, Commitment] = {} | |
| storage_keys: list[StorageKey] = [] | |
| for hotkey, node in nodes.items(): | |
| if node.vtrust > 0: | |
| continue | |
| storage_keys.append(substrate.create_storage_key( | |
| "Commitments", | |
| "CommitmentOf", | |
| [NET_UID, hotkey] | |
| )) | |
| results = substrate.query_multi( | |
| storage_keys=storage_keys, | |
| block_hash=substrate.get_block_hash(block), | |
| ) | |
| for storage, commitment in results: | |
| try: | |
| if not commitment: | |
| continue | |
| field = bytes(next(iter(commitment["info"]["fields"][0][0].values()))[0]) | |
| decoder = Decoder(field) | |
| spec_version = decoder.read_uint16() | |
| if spec_version != SPEC_VERSION: | |
| continue | |
| commitments[storage.params[1]] = Commitment.decode(decoder, int(commitment["block"])) | |
| except: | |
| traceback.print_exc() | |
| return commitments | |
| last_sync: datetime = datetime.fromtimestamp(0, TIMEZONE) | |
| last_identity_sync: datetime = datetime.fromtimestamp(0, TIMEZONE) | |
| def get_neurons() -> dict[Key, Neuron]: | |
| response = requests.get( | |
| "https://api.taostats.io/api/metagraph/latest/v1", | |
| headers={ | |
| "accept": "application/json", | |
| "Authorization": TAOSTATS_API_KEY, | |
| }, | |
| params={ | |
| "netuid": 39, | |
| } | |
| ) | |
| response.raise_for_status() | |
| return { | |
| neuron["hotkey"]["ss58"]: Neuron( | |
| hotkey=neuron["hotkey"]["ss58"], | |
| coldkey=neuron["coldkey"]["ss58"], | |
| validator_trust=float(neuron["validator_trust"]), | |
| validator_permit=bool(neuron["validator_permit"]), | |
| incentive=float(neuron["incentive"]), | |
| updated=int(neuron["updated"]), | |
| uid=int(neuron["uid"]), | |
| block_number=int(neuron["block_number"]), | |
| ) for neuron in response.json()["data"] | |
| } | |
| def get_latest_block(): | |
| return max([neuron.block_number for neuron in get_neurons().values()]) | |
| def sync_chain(): | |
| global substrate | |
| global last_sync | |
| now = datetime.now(TIMEZONE) | |
| if now - last_sync < timedelta(minutes=5): | |
| return | |
| last_sync = now | |
| try: | |
| print("Syncing chain...") | |
| block = get_latest_block() | |
| for hotkey, neuron in get_neurons().items(): | |
| HOTKEYS_BY_UID[neuron.uid] = hotkey | |
| fetch_weights(block) | |
| global last_identity_sync | |
| if now - last_identity_sync > timedelta(days=1): | |
| print("Syncing identities...") | |
| last_identity_sync = now | |
| fetch_identities(block) | |
| except Exception: | |
| print(f"Error occurred while syncing chain") | |
| traceback.print_exc() | |
| substrate = SubstrateInterface(substrate.url) | |