Spaces:
Build error
Build error
| # SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved. | |
| # SPDX-License-Identifier: Apache-2.0 | |
| # | |
| # Licensed under the Apache License, Version 2.0 (the "License"); | |
| # you may not use this file except in compliance with the License. | |
| # You may obtain a copy of the License at | |
| # | |
| # http://www.apache.org/licenses/LICENSE-2.0 | |
| # | |
| # Unless required by applicable law or agreed to in writing, software | |
| # distributed under the License is distributed on an "AS IS" BASIS, | |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
| # See the License for the specific language governing permissions and | |
| # limitations under the License. | |
| import argparse | |
| import importlib | |
| import os | |
| import torch.distributed as dist | |
| from loguru import logger as logging | |
| from omegaconf import OmegaConf | |
| from cosmos_predict1.diffusion.config.config import Config | |
| from cosmos_predict1.utils import log, misc | |
| from cosmos_predict1.utils.config_helper import get_config_module, override | |
| from cosmos_predict1.utils.lazy_config import instantiate | |
| from cosmos_predict1.utils.lazy_config.lazy import LazyConfig | |
| from cosmos_predict1.utils.parallel_state_helper import is_tp_cp_pp_rank0 | |
| def instantiate_model(config: Config, trainer) -> None: | |
| misc.set_random_seed(seed=config.trainer.seed, by_rank=False) | |
| config.model_obj.config = config.model | |
| if getattr(config.model, "fsdp_enabled", False): | |
| assert config.trainer.distributed_parallelism == "fsdp", "FSDP model is only supported with FSDP trainer" | |
| log.critical("FSDP enabled") | |
| config.model_obj.fsdp_checkpointer = trainer.checkpointer | |
| model = instantiate(config.model_obj) | |
| config.model_obj.fsdp_checkpointer = None | |
| else: | |
| model = instantiate(config.model_obj) | |
| config.model_obj.config = None | |
| misc.set_random_seed(seed=config.trainer.seed, by_rank=True) | |
| return model | |
| def destroy_distributed(): | |
| log.info("Destroying distributed environment...") | |
| if dist.is_available() and dist.is_initialized(): | |
| try: | |
| dist.destroy_process_group() | |
| except ValueError as e: | |
| print(f"Error destroying default process group: {e}") | |
| def launch(config: Config, args: argparse.Namespace) -> None: | |
| # Check that the config is valid | |
| config.validate() | |
| # Freeze the config so developers don't change it during training. | |
| config.freeze() # type: ignore | |
| trainer = config.trainer.type(config) | |
| # # Setup the miscellaneous stuff for reproducibility. | |
| # log_reproducible_setup(config, args) | |
| # Create the model | |
| model = instantiate_model(config, trainer) | |
| model.on_model_init_end() | |
| # Create the dataloaders. | |
| if args.mp0_only_dl: | |
| log.critical( | |
| "Using only tp_cp_pp_rank0 dataloader for faster dataloading! Make sure val dl is mock and mock data has same keys as real data." | |
| ) | |
| raise NotImplementedError( | |
| "mp0_only_dl is not implemented correctly! Please revisit this code and propose a more robust impl that raise error timely! It does not do necessary check before training to confirm it can work with image / video data. Current impl is problematic for image training." | |
| ) | |
| if is_tp_cp_pp_rank0() or not args.mp0_only_dl: | |
| dataloader_train = instantiate(config.dataloader_train) | |
| else: | |
| dataloader_train = instantiate(config.dataloader_val) | |
| dataloader_val = instantiate(config.dataloader_val) | |
| # Start training | |
| trainer.train( | |
| model, | |
| dataloader_train, | |
| dataloader_val, | |
| ) | |
| destroy_distributed() | |
| if __name__ == "__main__": | |
| parser = argparse.ArgumentParser(description="Training") | |
| parser.add_argument( | |
| "--config", | |
| default="cosmos_predict1/diffusion/posttrain/config/config.py", | |
| help="Path to the config file", | |
| ) | |
| parser.add_argument( | |
| "opts", | |
| help=""" | |
| Modify config options at the end of the command. For Yacs configs, use | |
| space-separated "PATH.KEY VALUE" pairs. | |
| For python-based LazyConfig, use "path.key=value". | |
| """.strip(), | |
| default=None, | |
| nargs=argparse.REMAINDER, | |
| ) | |
| parser.add_argument( | |
| "--dryrun", | |
| action="store_true", | |
| help="Do a dry run without training. Useful for debugging the config.", | |
| ) | |
| parser.add_argument( | |
| "--mp0_only_dl", | |
| action="store_true", | |
| help="Use only model parallel rank 0 dataloader for faster dataloading! Make sure mock data has same keys as real data.", | |
| ) | |
| args = parser.parse_args() | |
| config_module = get_config_module(args.config) | |
| config = importlib.import_module(config_module).make_config() | |
| config = override(config, args.opts) | |
| if args.dryrun: | |
| os.makedirs(config.job.path_local, exist_ok=True) | |
| LazyConfig.save_yaml(config, f"{config.job.path_local}/config.yaml") | |
| print(OmegaConf.to_yaml(OmegaConf.load(f"{config.job.path_local}/config.yaml"))) | |
| print(f"{config.job.path_local}/config.yaml") | |
| else: | |
| # Launch the training job. | |
| launch(config, args) | |