Spaces:
Runtime error
Runtime error
| # Copyright (c) Meta Platforms, Inc. and affiliates. | |
| # | |
| # This source code is licensed under the Chameleon License found in the | |
| # LICENSE file in the root directory of this source tree. | |
| import glob | |
| import inspect | |
| import json | |
| from pathlib import Path | |
| import torch | |
| from chameleon.inference.transformer import ModelArgs, Transformer | |
| def _convert(model_args: ModelArgs, consolidated_path: Path) -> Transformer: | |
| old_default_dtype = torch.get_default_dtype() | |
| torch.set_default_dtype(torch.bfloat16) | |
| model = Transformer(model_args) | |
| transfer_results = model.load_state_dict( | |
| torch.load(str(consolidated_path), map_location='cuda'), | |
| strict=False, | |
| ) | |
| # TODO: More generally, assert missing or unexpected keys are buffers. | |
| assert transfer_results.missing_keys == [] | |
| assert transfer_results.unexpected_keys == ["rope.freqs"] | |
| model.eval() | |
| torch.set_default_dtype(old_default_dtype) | |
| return model | |
| def _get_checkpoint_path(src_dir: Path, rank: int | None) -> Path: | |
| base_path = src_dir / "consolidated.pth" | |
| if not rank and base_path.exists(): | |
| return base_path | |
| alt_path = src_dir / f"consolidated.{rank:02}.pth" | |
| if alt_path.exists(): | |
| return alt_path | |
| raise ValueError("Consolidated checkpoint not found.") | |
| def load_model(path: str, rank: int | None = None) -> Transformer: | |
| src_dir = Path(path) | |
| with open(src_dir / "params.json", "r") as f: | |
| params = json.loads(f.read()) | |
| with open(src_dir / "consolidate_params.json", "r") as f: | |
| consolidate_params = json.loads(f.read()) | |
| params = {**params, **params["model"], **consolidate_params} | |
| known_param = inspect.signature(ModelArgs.__init__).parameters | |
| filtered_params = {k: v for k, v in params.items() if k in known_param} | |
| return _convert( | |
| ModelArgs(**filtered_params), | |
| _get_checkpoint_path(src_dir, rank), | |
| ) | |
| def detect_shard_count(path: str) -> int: | |
| src_dir = Path(path) | |
| if (src_dir / "consolidated.pth").exists(): | |
| return 1 | |
| return len(glob.glob(str(src_dir / "consolidated.*.pth"))) | |