| | import os |
| | import re |
| | from huggingface_hub import HfApi, HfFolder, logging as hf_logging, create_repo |
| | from torchtitan.tools.logging import logger |
| |
|
| | def upload_checkpoint_to_hf( |
| | local_path: str, |
| | step: int, |
| | hf_repo_id_for_run: str, |
| | hf_keep_latest_k: int, |
| | upload_format: str |
| | ): |
| | """Uploads a checkpoint directory to HF Hub and manages retention.""" |
| | if not os.path.isdir(local_path): |
| | logger.error(f"Local path for upload does not exist or is not a directory: {local_path}") |
| | return |
| |
|
| | api = HfApi() |
| | token = HfFolder.get_token() |
| | if not token: |
| | logger.warning("Hugging Face Hub token not found. Skipping upload. Login via `huggingface-cli login` or set HF_TOKEN.") |
| | return |
| |
|
| | |
| | try: |
| | logger.info(f"Ensuring repository {hf_repo_id_for_run} exists...") |
| | |
| | create_repo(repo_id=hf_repo_id_for_run, token=token, repo_type="model", exist_ok=True) |
| | logger.info(f"Repository {hf_repo_id_for_run} ensured.") |
| | except Exception as e: |
| | logger.error(f"Failed to create or ensure repository {hf_repo_id_for_run}: {e}", exc_info=True) |
| | return |
| |
|
| | commit_message = f"Upload {upload_format.upper()} checkpoint step {step}" |
| | path_in_repo = f"step-{step}" |
| |
|
| | logger.info(f"Uploading {local_path} to {hf_repo_id_for_run}/{path_in_repo} on Hugging Face Hub...") |
| | try: |
| | api.upload_folder( |
| | folder_path=local_path, |
| | path_in_repo=path_in_repo, |
| | repo_id=hf_repo_id_for_run, |
| | repo_type="model", |
| | commit_message=commit_message, |
| | token=token, |
| | ) |
| | logger.info(f"Successfully uploaded step {step} to {hf_repo_id_for_run}.") |
| | except Exception as e: |
| | logger.error(f"Failed to upload checkpoint step {step} to {hf_repo_id_for_run}: {e}", exc_info=True) |
| | if hf_keep_latest_k > 0: |
| | logger.info(f"Cleaning up old checkpoints on {hf_repo_id_for_run}, keeping latest {hf_keep_latest_k}") |
| | try: |
| | repo_files = api.list_repo_tree(hf_repo_id_for_run, repo_type="model", token=token, recursive=False) |
| | step_folders = [ |
| | item.path for item in repo_files |
| | if item.path.startswith("step-") and item.path[5:].isdigit() |
| | ] |
| |
|
| | step_folders.sort(key=lambda x: int(x.split('-')[1]), reverse=True) |
| |
|
| | if len(step_folders) > hf_keep_latest_k: |
| | folders_to_delete = step_folders[hf_keep_latest_k:] |
| | logger.info(f"Found {len(step_folders)} checkpoints on Hub. Deleting {len(folders_to_delete)} older ones: {folders_to_delete}") |
| | for folder in folders_to_delete: |
| | |
| | api.delete_folder( |
| | repo_id=hf_repo_id_for_run, |
| | path_in_repo=folder, |
| | repo_type="model", |
| | commit_message=f"Delete old checkpoint {folder}", |
| | token=token |
| | ) |
| | logger.info("Hub cleanup complete.") |
| | else: |
| | logger.info("No old checkpoints found on Hub to delete.") |
| | except Exception as e: |
| | logger.error(f"Error during Hub checkpoint cleanup for {hf_repo_id_for_run}: {e}", exc_info=True) |