Spaces:
Runtime error
Runtime error
| import argparse | |
| import datetime | |
| import functools | |
| import json | |
| import math | |
| import os | |
| import time | |
| import traceback | |
| from dataclasses import dataclass | |
| from typing import Any, Dict, List, Optional, Tuple | |
| import bittensor as bt | |
| import numpy as np | |
| import pandas as pd | |
| import wandb | |
| from bittensor.extrinsics.serving import get_metadata | |
| from dotenv import load_dotenv | |
| from wandb.apis.public.history import HistoryScan, SampledHistoryScan | |
| from competitions import COMP_NAME_TO_ID | |
| NETUID = 37 | |
| DELAY_SECS = 3 | |
| RETRIES = 3 | |
| load_dotenv() | |
| WANDB_TOKEN = os.environ.get("WANDB_API_KEY", None) | |
| SUBTENSOR_ENDPOINT = os.environ.get("SUBTENSOR_ENDPOINT", None) | |
| VALIDATOR_WANDB_PROJECT = "rusticluftig/finetuning" | |
| BENCHMARK_WANDB_PROJECT = "rusticluftig/test-benchmarks" | |
| class ModelData: | |
| uid: int | |
| hotkey: str | |
| competition_id: int | |
| namespace: str | |
| name: str | |
| commit: str | |
| # Hash of (hash(model) + hotkey) | |
| secure_hash: str | |
| block: int | |
| incentive: float | |
| emission: float | |
| def from_compressed_str( | |
| cls, | |
| uid: int, | |
| hotkey: str, | |
| cs: str, | |
| block: int, | |
| incentive: float, | |
| emission: float, | |
| ): | |
| """Returns an instance of this class from a compressed string representation""" | |
| tokens = cs.split(":") | |
| return ModelData( | |
| uid=uid, | |
| hotkey=hotkey, | |
| namespace=tokens[0], | |
| name=tokens[1], | |
| commit=tokens[2], | |
| secure_hash=tokens[3], | |
| competition_id=int(tokens[4]), | |
| block=block, | |
| incentive=incentive, | |
| emission=emission, | |
| ) | |
| def run_with_retries(func, *args, **kwargs): | |
| """Runs a provided function with retries in the event of a failure.""" | |
| for i in range(0, RETRIES): | |
| try: | |
| return func(*args, **kwargs) | |
| except (Exception, RuntimeError): | |
| print(f"Failed to run function: {traceback.format_exc()}") | |
| if i == RETRIES - 1: | |
| raise | |
| time.sleep(DELAY_SECS) | |
| raise RuntimeError("Should never happen") | |
| def get_subtensor_and_metagraph() -> Tuple[bt.subtensor, bt.metagraph]: | |
| """Returns a subtensor and metagraph for the finetuning subnet.""" | |
| def _internal() -> Tuple[bt.subtensor, bt.metagraph]: | |
| if SUBTENSOR_ENDPOINT: | |
| parser = argparse.ArgumentParser() | |
| bt.subtensor.add_args(parser) | |
| subtensor = bt.subtensor( | |
| config=bt.config( | |
| parser=parser, | |
| args=["--subtensor.chain_endpoint", SUBTENSOR_ENDPOINT], | |
| ) | |
| ) | |
| else: | |
| subtensor = bt.subtensor("finney") | |
| metagraph = subtensor.metagraph(NETUID, lite=False) | |
| return subtensor, metagraph | |
| return run_with_retries(_internal) | |
| def get_subnet_data( | |
| subtensor: bt.subtensor, metagraph: bt.metagraph | |
| ) -> List[ModelData]: | |
| result = [] | |
| for uid in metagraph.uids.tolist(): | |
| hotkey = metagraph.hotkeys[uid] | |
| metadata = None | |
| try: | |
| metadata = run_with_retries( | |
| functools.partial(get_metadata, subtensor, metagraph.netuid, hotkey) | |
| ) | |
| except: | |
| print(f"Failed to get metadata for UID {uid}: {traceback.format_exc()}") | |
| if not metadata: | |
| continue | |
| commitment = metadata["info"]["fields"][0] | |
| hex_data = commitment[list(commitment.keys())[0]][2:] | |
| chain_str = bytes.fromhex(hex_data).decode() | |
| block = metadata["block"] | |
| incentive = np.nan_to_num(metagraph.incentive[uid]).item() | |
| emission = ( | |
| np.nan_to_num(metagraph.emission[uid]).item() * 20 | |
| ) # convert to daily TAO | |
| model_data = None | |
| try: | |
| model_data = ModelData.from_compressed_str( | |
| uid, hotkey, chain_str, block, incentive, emission | |
| ) | |
| except: | |
| continue | |
| result.append(model_data) | |
| return result | |
| def get_wandb_runs( | |
| project: str, filters: Dict[str, Any], order: str = "-created_at" | |
| ) -> List: | |
| """Get the latest runs from Wandb, retrying infinitely until we get them. | |
| Args: | |
| project (str): The Wandb project to get runs from. | |
| filters (Dict[str, Any]): Filters to apply to the runs. | |
| order (str): Order to sort the runs by. Defaults to "-created_at" (newest first) | |
| Returns: | |
| List: List of runs matching the provided filters | |
| """ | |
| while True: | |
| api = wandb.Api(api_key=WANDB_TOKEN, timeout=100) | |
| runs = list( | |
| api.runs( | |
| project, | |
| filters=filters, | |
| order=order, | |
| ) | |
| ) | |
| if len(runs) > 0: | |
| return runs | |
| # WandDB API is quite unreliable. Wait another minute and try again. | |
| print("Failed to get runs from Wandb. Trying again in 60 seconds.") | |
| time.sleep(60) | |
| def get_scores( | |
| uids: List[int], | |
| wandb_runs: List, | |
| ) -> Dict[int, Dict[str, Optional[float]]]: | |
| """Returns the most recent scores for the provided UIDs. | |
| Args: | |
| uids (List[int]): List of UIDs to get scores for. | |
| wandb_runs (List): List of validator runs from Wandb. Requires the runs are provided in descending order. | |
| """ | |
| result = {} | |
| previous_timestamp = None | |
| seen_competitions = set() | |
| # Iterate through the runs until we've processed all the uids. | |
| for i, run in enumerate(wandb_runs): | |
| if not "original_format_json" in run.summary: | |
| continue | |
| data = json.loads(run.summary["original_format_json"]) | |
| all_uid_data = data["uid_data"] | |
| timestamp = data["timestamp"] | |
| # Make sure runs are indeed in descending time order. | |
| assert ( | |
| previous_timestamp is None or timestamp < previous_timestamp | |
| ), f"Timestamps are not in descending order: {timestamp} >= {previous_timestamp}" | |
| previous_timestamp = timestamp | |
| comp_id = data.get("competition_id", None) | |
| for uid in uids: | |
| if uid in result: | |
| continue | |
| if str(uid) in all_uid_data: | |
| uid_data = all_uid_data[str(uid)] | |
| # Only the most recent run per competition is fresh. | |
| is_fresh = comp_id not in seen_competitions | |
| result[uid] = { | |
| "avg_loss": uid_data.get("average_loss", None), | |
| "win_rate": uid_data.get("win_rate", None), | |
| "win_total": uid_data.get("win_total", None), | |
| "weight": uid_data.get("weight", None), | |
| "competition_id": uid_data.get("competition_id", None), | |
| "fresh": is_fresh, | |
| } | |
| seen_competitions.add(comp_id) | |
| if len(result) == len(uids): | |
| break | |
| return result | |
| def get_validator_weights( | |
| metagraph: bt.metagraph, | |
| ) -> Dict[int, Tuple[float, int, Dict[int, float]]]: | |
| """Returns a dictionary of validator UIDs to (vtrust, stake, {uid: weight}).""" | |
| ret = {} | |
| for uid in metagraph.uids.tolist(): | |
| vtrust = metagraph.validator_trust[uid].item() | |
| stake = metagraph.stake[uid].item() | |
| if vtrust > 0 and stake > 10_000: | |
| ret[uid] = (vtrust, stake, {}) | |
| for ouid in metagraph.uids.tolist(): | |
| if ouid == uid: | |
| continue | |
| weight = round(metagraph.weights[uid][ouid].item(), 4) | |
| if weight > 0: | |
| ret[uid][-1][ouid] = weight | |
| return ret | |
| def get_losses_over_time(wandb_runs: List, competition_id: int) -> pd.DataFrame: | |
| """Returns a dataframe of the best average model loss over time.""" | |
| timestamps = [] | |
| losses = [] | |
| for run in wandb_runs: | |
| # For each run, check the 10 most recent steps. | |
| best_loss = math.inf | |
| should_add_datapoint = False | |
| min_step = max(0, run.lastHistoryStep - 10) | |
| history_scan = SampledHistoryScan( | |
| run.client, | |
| run, | |
| ["original_format_json"], | |
| min_step, | |
| run.lastHistoryStep, | |
| page_size=10, | |
| ) | |
| max_timestamp = None | |
| for step in history_scan: | |
| data = json.loads(step["original_format_json"]) | |
| all_uid_data = data["uid_data"] | |
| timestamp = datetime.datetime.fromtimestamp(data["timestamp"]) | |
| if max_timestamp is None: | |
| max_timestamp = timestamp | |
| max_timestamp = max(max_timestamp, timestamp) | |
| for _, uid_data in all_uid_data.items(): | |
| loss = uid_data.get("average_loss", math.inf) | |
| c_id = uid_data.get("competition_id", None) | |
| if c_id is None or c_id != competition_id: | |
| continue | |
| # Filter out issue caused by wandb unavailability. | |
| if loss < 0.99 and loss < best_loss: | |
| best_loss = loss | |
| should_add_datapoint = True | |
| # Now that we've processed the run's most recent steps, check if we should add a datapoint. | |
| if should_add_datapoint: | |
| timestamps.append(max_timestamp) | |
| losses.append(best_loss) | |
| return pd.DataFrame({"timestamp": timestamps, "losses": losses}) | |
| def is_floatable(x) -> bool: | |
| return ( | |
| isinstance(x, float) and not math.isnan(x) and not math.isinf(x) | |
| ) or isinstance(x, int) | |
| def format_score(uid: int, scores, key) -> Optional[float]: | |
| if uid in scores: | |
| if key in scores[uid]: | |
| point = scores[uid][key] | |
| if is_floatable(point): | |
| return round(scores[uid][key], 4) | |
| return None | |
| def leaderboard_data( | |
| leaderboard: List[ModelData], | |
| scores: Dict[int, Dict[str, Optional[float]]], | |
| competition_id: int, | |
| show_stale: bool, | |
| ) -> List[List[Any]]: | |
| """Returns the leaderboard data, based on models data and UID scores.""" | |
| return [ | |
| [ | |
| f"[{c.namespace}/{c.name} ({c.commit[0:8]})](https://huggingface.co/{c.namespace}/{c.name}/commit/{c.commit})", | |
| format_score(c.uid, scores, "win_rate"), | |
| format_score(c.uid, scores, "avg_loss"), | |
| format_score(c.uid, scores, "weight"), | |
| c.uid, | |
| c.block, | |
| ] | |
| for c in leaderboard | |
| if c.competition_id == competition_id | |
| and ((c.uid in scores and scores[c.uid]["fresh"]) or show_stale) | |
| ] | |
| def get_benchmarks() -> Tuple[pd.DataFrame, Dict[str, Dict[str, float]]]: | |
| """Returns the latest benchmarks and the time they were run.""" | |
| if not BENCHMARK_WANDB_PROJECT: | |
| print("No benchmark project set.") | |
| return None, None | |
| runs = get_wandb_runs( | |
| project=BENCHMARK_WANDB_PROJECT, filters=None, order="+created_at" | |
| ) | |
| timestamps, uids, models, comp_ids, mmlu, mmlu_pro = [], [], [], [], [], [] | |
| for run in runs: | |
| uid = run.config.get("uid", None) | |
| model = run.config.get("model", None) | |
| # Any run without a competition_id was for competition 2. | |
| comp_name = run.config.get("competition_id", "B7_MULTI_CHOICE") | |
| comp_id = COMP_NAME_TO_ID.get(comp_name, 2) | |
| if not uid or not model: | |
| continue | |
| samples = list( | |
| HistoryScan( | |
| run.client, | |
| run, | |
| 0, | |
| 1, | |
| ) | |
| ) | |
| if not samples: | |
| continue | |
| sample = samples[0] | |
| # Make sure we have all the required keys. | |
| has_all_keys = True | |
| for required_key in ["mmlu.acc,none", "mmlu_pro", "_timestamp"]: | |
| if required_key not in sample: | |
| has_all_keys = False | |
| break | |
| if not has_all_keys: | |
| continue | |
| comp_ids.append(comp_id) | |
| timestamps.append(datetime.datetime.fromtimestamp(sample["_timestamp"])) | |
| mmlu.append(sample["mmlu.acc,none"]) | |
| mmlu_pro.append(sample["mmlu_pro"]) | |
| uids.append(uid) | |
| models.append(model) | |
| return ( | |
| pd.DataFrame( | |
| { | |
| "timestamp": timestamps, | |
| "uid": uids, | |
| "model": models, | |
| "competition_id": comp_ids, | |
| "mmlu": mmlu, | |
| "mmlu_pro": mmlu_pro, | |
| } | |
| ), | |
| { | |
| "mmlu": { | |
| "Llama-3.1-8B-Instruct": 0.681, | |
| "Mistral-7B-Instruct-v0.3": 0.597, | |
| "gemma-2-9b-it": 0.719, | |
| }, | |
| "mmlu_pro": { | |
| "Llama-3.1-8B-Instruct": 30.68, | |
| "Mistral-7B-Instruct-v0.3": 23.06, | |
| "gemma-2-9b-it": 31.95, | |
| }, | |
| }, | |
| ) | |
| def make_validator_dataframe( | |
| validator_df: pd.DataFrame, model_data: ModelData | |
| ) -> pd.DataFrame: | |
| values = [ | |
| [uid, int(validator_df[uid][1]), round(validator_df[uid][0], 4)] | |
| + [validator_df[uid][-1].get(c.uid) for c in model_data if c.incentive] | |
| for uid, _ in sorted( | |
| zip( | |
| validator_df.keys(), | |
| [validator_df[x][1] for x in validator_df.keys()], | |
| ), | |
| key=lambda x: x[1], | |
| reverse=True, | |
| ) | |
| ] | |
| dtypes = {"UID": int, "Stake (τ)": float, "V-Trust": float} | |
| dtypes.update( | |
| { | |
| f"{c.namespace}/{c.name} ({c.commit[0:8]})": float | |
| for c in model_data | |
| if c.incentive | |
| } | |
| ) | |
| return pd.DataFrame(values, columns=dtypes.keys()).astype(dtypes) | |
| def make_metagraph_dataframe(metagraph: bt.metagraph, weights=False) -> pd.DataFrame: | |
| cols = [ | |
| "stake", | |
| "emission", | |
| "trust", | |
| "validator_trust", | |
| "dividends", | |
| "incentive", | |
| "R", | |
| "consensus", | |
| "validator_permit", | |
| ] | |
| frame = pd.DataFrame({k: getattr(metagraph, k) for k in cols}) | |
| frame["block"] = metagraph.block.item() | |
| frame["netuid"] = NETUID | |
| frame["uid"] = range(len(frame)) | |
| frame["hotkey"] = [axon.hotkey for axon in metagraph.axons] | |
| frame["coldkey"] = [axon.coldkey for axon in metagraph.axons] | |
| if weights and metagraph.W is not None: | |
| # convert NxN tensor to a list of lists so it fits into the dataframe | |
| frame["weights"] = [w.tolist() for w in metagraph.W] | |
| return frame | |
| def load_state_vars() -> dict[Any]: | |
| while True: | |
| try: | |
| subtensor, metagraph = get_subtensor_and_metagraph() | |
| print(f"Loaded subtensor and metagraph: {metagraph}") | |
| model_data: List[ModelData] = get_subnet_data(subtensor, metagraph) | |
| model_data.sort(key=lambda x: x.incentive, reverse=True) | |
| print(f"Loaded {len(model_data)} models") | |
| vali_runs = get_wandb_runs( | |
| project=VALIDATOR_WANDB_PROJECT, | |
| filters={ | |
| "$and": [{"config.type": "validator"}], | |
| "$or": [{"config.uid": 28}, {"config.uid": 16}], | |
| }, | |
| ) | |
| print(f"Loaded {len(vali_runs)} validator runs") | |
| scores = get_scores([x.uid for x in model_data], vali_runs) | |
| print(f"Loaded {len(scores)} scores") | |
| validator_df = get_validator_weights(metagraph) | |
| weight_keys = set() | |
| for uid, stats in validator_df.items(): | |
| weight_keys.update(stats[-1].keys()) | |
| print("Loaded validator weights") | |
| # Compute loss over time for all competitions. | |
| # losses_2 = get_losses_over_time(vali_runs, 2) | |
| # print("Loaded losses over time for comp 2") | |
| benchmarks_df, benchmarks_targets = get_benchmarks() | |
| print("Loaded benchmarks") | |
| break | |
| except KeyboardInterrupt: | |
| print("Exiting...") | |
| break | |
| except Exception as e: | |
| print(f"Failed to get data: {traceback.format_exc()}") | |
| time.sleep(30) | |
| return { | |
| "metagraph": metagraph, | |
| "model_data": model_data, | |
| "vali_runs": vali_runs, | |
| "scores": scores, | |
| "validator_df": validator_df, | |
| "benchmarks_df": benchmarks_df, | |
| "benchmarks_targets": benchmarks_targets, | |
| } | |