| | import json |
| | import os |
| | import re |
| | from collections import defaultdict |
| | from datetime import datetime, timedelta, timezone |
| |
|
| | import huggingface_hub |
| | from huggingface_hub import ModelCard, login |
| | from huggingface_hub.hf_api import ModelInfo, get_safetensors_metadata |
| | from transformers import AutoConfig, AutoTokenizer |
| |
|
| | from src.envs import HAS_HIGHER_RATE_LIMIT, HF_TOKEN |
| | from huggingface_hub import hf_hub_download, HfFileSystem |
| | from huggingface_hub.utils import validate_repo_id |
| | from pathlib import Path |
| | import fnmatch |
| | from huggingface_hub.hf_api import get_hf_file_metadata, hf_hub_url |
| |
|
| | login(token=HF_TOKEN) |
| |
|
| | |
| | |
| | def check_model_card(repo_id: str) -> tuple[bool, str]: |
| | |
| | try: |
| | card = ModelCard.load(repo_id) |
| | except huggingface_hub.utils.EntryNotFoundError: |
| | return False, "Please add a model card to your model to explain how you trained/fine-tuned it.", None |
| |
|
| | |
| | if card.data.license is None: |
| | if not ("license_name" in card.data and "license_link" in card.data): |
| | return False, ( |
| | "License not found. Please add a license to your model card using the `license` metadata or a" |
| | " `license_name`/`license_link` pair." |
| | ), None |
| |
|
| | |
| | if len(card.text) < 200: |
| | return False, "Please add a description to your model card, it is too short.", None |
| |
|
| | return True, "", card |
| |
|
| |
|
| | def is_model_on_hub(model_name: str, revision: str, token: str = None, trust_remote_code=True, test_tokenizer=False) -> tuple[bool, str, AutoConfig]: |
| | try: |
| | config = AutoConfig.from_pretrained(model_name, revision=revision, trust_remote_code=trust_remote_code, token=token) |
| | if test_tokenizer: |
| | try: |
| | tk = AutoTokenizer.from_pretrained(model_name, revision=revision, trust_remote_code=trust_remote_code, token=token) |
| | except ValueError as e: |
| | return ( |
| | False, |
| | f"uses a tokenizer which is not in a transformers release: {e}", |
| | None |
| | ) |
| | except Exception as e: |
| | import traceback |
| | print(traceback.format_exc()) |
| | return (False, "'s tokenizer cannot be loaded. Is your tokenizer class in a stable transformers release, and correctly configured?", None) |
| | return True, None, config |
| |
|
| | except ValueError as e: |
| | return ( |
| | False, |
| | "needs to be launched with `trust_remote_code=True`. For safety reason, we do not allow these models to be automatically submitted to the leaderboard.", |
| | None |
| | ) |
| |
|
| | except Exception as e: |
| | if "You are trying to access a gated repo." in str(e): |
| | return True, "uses a gated model.", None |
| | return False, f"was not found or misconfigured on the hub! Error raised was {e.args[0]}", None |
| |
|
| |
|
| | def get_model_size(model_info: ModelInfo, precision: str): |
| | size_pattern = re.compile(r"(\d+\.)?\d+(b|m)") |
| | safetensors = None |
| | try: |
| | safetensors = get_safetensors_metadata(model_info.id) |
| | num_parameters = 0 |
| | mem = 0 |
| | for key in safetensors.parameter_count: |
| | if key in ["F16", "BF16"]: |
| | mem += safetensors.parameter_count[key] * 2 |
| | else: |
| | mem += safetensors.parameter_count[key] * 4 |
| |
|
| | num_parameters += safetensors.parameter_count[key] |
| |
|
| | params_b = round(num_parameters / 1e9, 2) |
| | size_gb = round(mem / 1e9,2) |
| | return params_b, size_gb |
| | except Exception as e: |
| | print(str(e)) |
| |
|
| | if safetensors is not None: |
| | model_size = round(sum(safetensors.parameter_count.values()) / 1e9, 3) |
| | else: |
| | try: |
| | size_match = re.search(size_pattern, model_info.id.lower()) |
| | model_size = size_match.group(0) |
| | model_size = round(float(model_size[:-1]) if model_size[-1] == "b" else float(model_size[:-1]) / 1e3, 3) |
| | except AttributeError as e: |
| | return 0 |
| |
|
| | |
| | |
| | if precision == "16bit": |
| | size_gb = model_size * 2 |
| | else: |
| | size_gb = model_size * 4 |
| | return model_size, size_gb |
| |
|
| | KNOWN_SIZE_FACTOR = { |
| | "gptq": {"4bit": 8, "8bit": 4, "2bit": 8, "3bit": 12}, |
| | "awq": {"4bit": 8}, |
| | "bitsandbytes": {"4bit": 2}, |
| | "aqlm": {"4bit": 8, "8bit": 4, "2bit": 8, "3bit": 6}, |
| | } |
| |
|
| | BYTES = { |
| | "I32": 4, |
| | "I16": 2, |
| | "I8": 1, |
| | "F16": 2, |
| | "BF16": 2, |
| | "F32": 4, |
| | "U8": 1} |
| |
|
| |
|
| | def get_quantized_model_parameters_memory(model_info: ModelInfo, quant_method="", bits="4bit"): |
| | try: |
| | safetensors = get_safetensors_metadata(model_info.id) |
| | num_parameters = 0 |
| | mem = 0 |
| | for key in safetensors.parameter_count: |
| | mem += safetensors.parameter_count[key] * BYTES[key] |
| | if key in ["I32", "U8", "I16", "I8"]: |
| | param = safetensors.parameter_count[key] * KNOWN_SIZE_FACTOR[quant_method][bits] |
| | if key == "I8": |
| | param = param / 2 |
| | num_parameters += param |
| |
|
| | params_b = round(num_parameters / 1e9, 2) |
| | size_gb = round(mem / 1e9,2) |
| | return params_b, size_gb |
| | except Exception as e: |
| | print(str(e)) |
| |
|
| | filenames = [sib.rfilename for sib in model_info.siblings] |
| | if "pytorch_model.bin" in filenames or "model.safetensors" in filenames: |
| | bin_filename = "pytorch_model.bin" if "pytorch_model.bin" in filenames else "model.safetensors" |
| | url = hf_hub_url(model_info.id, filename=bin_filename) |
| | meta = get_hf_file_metadata(url) |
| | params_b = round(meta.size * 2 / 1e9, 2) |
| | size_gb = round(meta.size / 1e9, 2) |
| | return params_b, size_gb |
| |
|
| | if "pytorch_model.bin.index.json" in filenames or "model.safetensors.index.json" in filenames: |
| | json_file_name = "pytorch_model.bin.index.json" if "pytorch_model.bin.index.json" in filenames else "model.safetensors.index.json" |
| | index_path = hf_hub_download(model_info.id, filename=json_file_name) |
| | """ |
| | { |
| | "metadata": { |
| | "total_size": 28272820224 |
| | },.... |
| | """ |
| | size = json.load(open(index_path)) |
| | bytes_per_param = 2 |
| | if ("metadata" in size) and ("total_size" in size["metadata"]): |
| | return round(size["metadata"]["total_size"] / bytes_per_param / 1e9, 2), \ |
| | round(size["metadata"]["total_size"] / 1e9, 2) |
| |
|
| | return None, None |
| |
|
| | def get_model_arch(model_info: ModelInfo): |
| | return model_info.config.get("architectures", "Unknown") |
| |
|
| | def user_submission_permission(org_or_user, users_to_submission_dates, rate_limit_period, rate_limit_quota): |
| | if org_or_user not in users_to_submission_dates: |
| | return True, "" |
| | submission_dates = sorted(users_to_submission_dates[org_or_user]) |
| |
|
| | time_limit = (datetime.now(timezone.utc) - timedelta(days=rate_limit_period)).strftime("%Y-%m-%dT%H:%M:%SZ") |
| | submissions_after_timelimit = [d for d in submission_dates if d > time_limit] |
| |
|
| | num_models_submitted_in_period = len(submissions_after_timelimit) |
| | if org_or_user in HAS_HIGHER_RATE_LIMIT: |
| | rate_limit_quota = 2 * rate_limit_quota |
| |
|
| | if num_models_submitted_in_period > rate_limit_quota: |
| | error_msg = f"Organisation or user `{org_or_user}`" |
| | error_msg += f"already has {num_models_submitted_in_period} model requests submitted to the leaderboard " |
| | error_msg += f"in the last {rate_limit_period} days.\n" |
| | error_msg += ( |
| | "Please wait a couple of days before resubmitting, so that everybody can enjoy using the leaderboard 🤗" |
| | ) |
| | return False, error_msg |
| | return True, "" |
| |
|
| |
|
| | def already_submitted_models(requested_models_dir: str) -> set[str]: |
| | depth = 1 |
| | file_names = [] |
| | users_to_submission_dates = defaultdict(list) |
| |
|
| | for root, _, files in os.walk(requested_models_dir): |
| | current_depth = root.count(os.sep) - requested_models_dir.count(os.sep) |
| | if current_depth == depth: |
| | for file in files: |
| | if not file.endswith(".json"): |
| | continue |
| | with open(os.path.join(root, file), "r") as f: |
| | info = json.load(f) |
| | |
| | quant_type = info.get("quant_type", "None") |
| | weight_dtype = info.get("weight_dtype", "None") |
| | compute_dtype = info.get("compute_dtype", "None") |
| | file_names.append(f"{info['model']}_{info['revision']}_{quant_type}_{info['precision']}_{weight_dtype}_{compute_dtype}") |
| |
|
| | |
| | if info["model"].count("/") == 0 or "submitted_time" not in info: |
| | continue |
| |
|
| | try: |
| | organisation, _ = info["model"].split("/") |
| | except: |
| | print(info["model"]) |
| | organisation = "local" |
| | users_to_submission_dates[organisation].append(info["submitted_time"]) |
| |
|
| | return set(file_names), users_to_submission_dates |
| |
|
| | def get_model_tags(model_card, model: str): |
| | is_merge_from_metadata = False |
| | is_moe_from_metadata = False |
| |
|
| | tags = [] |
| | if model_card is None: |
| | return tags |
| | if model_card.data.tags: |
| | is_merge_from_metadata = any([tag in model_card.data.tags for tag in ["merge", "moerge", "mergekit", "lazymergekit"]]) |
| | is_moe_from_metadata = any([tag in model_card.data.tags for tag in ["moe", "moerge"]]) |
| |
|
| | is_merge_from_model_card = any(keyword in model_card.text.lower() for keyword in ["merged model", "merge model", "moerge"]) |
| | if is_merge_from_model_card or is_merge_from_metadata: |
| | tags.append("merge") |
| | is_moe_from_model_card = any(keyword in model_card.text.lower() for keyword in ["moe", "mixtral"]) |
| | is_moe_from_name = "moe" in model.lower().replace("/", "-").replace("_", "-").split("-") |
| | if is_moe_from_model_card or is_moe_from_name or is_moe_from_metadata: |
| | tags.append("moe") |
| |
|
| | return tags |
| |
|
| | def is_gguf_on_hub(repo_id: str, filename="*Q4_0.gguf"): |
| |
|
| | validate_repo_id(repo_id) |
| |
|
| | hffs = HfFileSystem() |
| |
|
| | files = [ |
| | file["name"] if isinstance(file, dict) else file |
| | for file in hffs.ls(repo_id) |
| | ] |
| |
|
| | |
| | file_list: List[str] = [] |
| | for file in files: |
| | rel_path = Path(file).relative_to(repo_id) |
| | file_list.append(str(rel_path)) |
| |
|
| | print(file_list) |
| |
|
| | matching_files = [file for file in file_list if fnmatch.fnmatch(file, filename)] |
| | if len(matching_files) > 0: |
| | return True, None, matching_files, None |
| |
|
| | matching_files = [file for file in file_list if fnmatch.fnmatch(file, filename.lower())] |
| |
|
| | if len(matching_files) > 0: |
| | return True, None, matching_files, filename.lower() |
| | else: |
| | return False, f"the model {repo_id} don't contains any {filename}.", None, None |
| |
|