| import json |
| import logging |
| import os |
| from functools import partial |
| from pathlib import Path |
| from tempfile import TemporaryDirectory |
| from typing import Optional, Union |
|
|
| import torch |
| from torch.hub import HASH_REGEX, download_url_to_file, urlparse |
|
|
| try: |
| from torch.hub import get_dir |
| except ImportError: |
| from torch.hub import _get_torch_home as get_dir |
|
|
| from custom_timm import __version__ |
|
|
| try: |
| from huggingface_hub import (create_repo, get_hf_file_metadata, |
| hf_hub_download, hf_hub_url, |
| repo_type_and_id_from_hf_id, upload_folder) |
| from huggingface_hub.utils import EntryNotFoundError |
| hf_hub_download = partial(hf_hub_download, library_name="timm", library_version=__version__) |
| _has_hf_hub = True |
| except ImportError: |
| hf_hub_download = None |
| _has_hf_hub = False |
|
|
| _logger = logging.getLogger(__name__) |
|
|
|
|
| def get_cache_dir(child_dir=''): |
| """ |
| Returns the location of the directory where models are cached (and creates it if necessary). |
| """ |
| |
| if os.getenv('TORCH_MODEL_ZOO'): |
| _logger.warning('TORCH_MODEL_ZOO is deprecated, please use env TORCH_HOME instead') |
|
|
| hub_dir = get_dir() |
| child_dir = () if not child_dir else (child_dir,) |
| model_dir = os.path.join(hub_dir, 'checkpoints', *child_dir) |
| os.makedirs(model_dir, exist_ok=True) |
| return model_dir |
|
|
|
|
| def download_cached_file(url, check_hash=True, progress=False): |
| parts = urlparse(url) |
| filename = os.path.basename(parts.path) |
| cached_file = os.path.join(get_cache_dir(), filename) |
| if not os.path.exists(cached_file): |
| _logger.info('Downloading: "{}" to {}\n'.format(url, cached_file)) |
| hash_prefix = None |
| if check_hash: |
| r = HASH_REGEX.search(filename) |
| hash_prefix = r.group(1) if r else None |
| download_url_to_file(url, cached_file, hash_prefix, progress=progress) |
| return cached_file |
|
|
|
|
| def has_hf_hub(necessary=False): |
| if not _has_hf_hub and necessary: |
| |
| raise RuntimeError( |
| 'Hugging Face hub model specified but package not installed. Run `pip install huggingface_hub`.') |
| return _has_hf_hub |
|
|
|
|
| def hf_split(hf_id): |
| |
| rev_split = hf_id.split('@') |
| assert 0 < len(rev_split) <= 2, 'hf_hub id should only contain one @ character to identify revision.' |
| hf_model_id = rev_split[0] |
| hf_revision = rev_split[-1] if len(rev_split) > 1 else None |
| return hf_model_id, hf_revision |
|
|
|
|
| def load_cfg_from_json(json_file: Union[str, os.PathLike]): |
| with open(json_file, "r", encoding="utf-8") as reader: |
| text = reader.read() |
| return json.loads(text) |
|
|
|
|
| def _download_from_hf(model_id: str, filename: str): |
| hf_model_id, hf_revision = hf_split(model_id) |
| return hf_hub_download(hf_model_id, filename, revision=hf_revision) |
|
|
|
|
| def load_model_config_from_hf(model_id: str): |
| assert has_hf_hub(True) |
| cached_file = _download_from_hf(model_id, 'config.json') |
| pretrained_cfg = load_cfg_from_json(cached_file) |
| pretrained_cfg['hf_hub_id'] = model_id |
| pretrained_cfg['source'] = 'hf-hub' |
| model_name = pretrained_cfg.get('architecture') |
| return pretrained_cfg, model_name |
|
|
|
|
| def load_state_dict_from_hf(model_id: str, filename: str = 'pytorch_model.bin'): |
| assert has_hf_hub(True) |
| cached_file = _download_from_hf(model_id, filename) |
| state_dict = torch.load(cached_file, map_location='cpu') |
| return state_dict |
|
|
|
|
| def save_for_hf(model, save_directory, model_config=None): |
| assert has_hf_hub(True) |
| model_config = model_config or {} |
| save_directory = Path(save_directory) |
| save_directory.mkdir(exist_ok=True, parents=True) |
|
|
| weights_path = save_directory / 'pytorch_model.bin' |
| torch.save(model.state_dict(), weights_path) |
|
|
| config_path = save_directory / 'config.json' |
| hf_config = model.pretrained_cfg |
| hf_config['num_classes'] = model_config.pop('num_classes', model.num_classes) |
| hf_config['num_features'] = model_config.pop('num_features', model.num_features) |
| hf_config['labels'] = model_config.pop('labels', [f"LABEL_{i}" for i in range(hf_config['num_classes'])]) |
| hf_config.update(model_config) |
|
|
| with config_path.open('w') as f: |
| json.dump(hf_config, f, indent=2) |
|
|
|
|
| def push_to_hf_hub( |
| model, |
| repo_id: str, |
| commit_message: str ='Add model', |
| token: Optional[str] = None, |
| revision: Optional[str] = None, |
| private: bool = False, |
| create_pr: bool = False, |
| model_config: Optional[dict] = None, |
| ): |
| |
| repo_url = create_repo(repo_id, token=token, private=private, exist_ok=True) |
|
|
| |
| |
| _, repo_owner, repo_name = repo_type_and_id_from_hf_id(repo_url) |
| repo_id = f"{repo_owner}/{repo_name}" |
|
|
| |
| try: |
| get_hf_file_metadata(hf_hub_url(repo_id=repo_id, filename="README.md", revision=revision)) |
| has_readme = True |
| except EntryNotFoundError: |
| has_readme = False |
|
|
| |
| with TemporaryDirectory() as tmpdir: |
| |
| save_for_hf(model, tmpdir, model_config=model_config) |
|
|
| |
| if not has_readme: |
| readme_path = Path(tmpdir) / "README.md" |
| readme_text = f'---\ntags:\n- image-classification\n- timm\nlibrary_tag: timm\n---\n# Model card for {repo_id}' |
| readme_path.write_text(readme_text) |
|
|
| |
| return upload_folder( |
| repo_id=repo_id, |
| folder_path=tmpdir, |
| revision=revision, |
| create_pr=create_pr, |
| commit_message=commit_message, |
| ) |
|
|