| import os |
| import re |
| from huggingface_hub import HfApi, HfFolder, logging as hf_logging, create_repo |
| from torchtitan.tools.logging import logger |
|
|
| def upload_checkpoint_to_hf( |
| local_path: str, |
| step: int, |
| hf_repo_id_for_run: str, |
| hf_keep_latest_k: int, |
| upload_format: str |
| ): |
| """Uploads a checkpoint directory to HF Hub and manages retention.""" |
| if not os.path.isdir(local_path): |
| logger.error(f"Local path for upload does not exist or is not a directory: {local_path}") |
| return |
|
|
| api = HfApi() |
| token = HfFolder.get_token() |
| if not token: |
| logger.warning("Hugging Face Hub token not found. Skipping upload. Login via `huggingface-cli login` or set HF_TOKEN.") |
| return |
|
|
| |
| try: |
| logger.info(f"Ensuring repository {hf_repo_id_for_run} exists...") |
| |
| create_repo(repo_id=hf_repo_id_for_run, token=token, repo_type="model", exist_ok=True) |
| logger.info(f"Repository {hf_repo_id_for_run} ensured.") |
| except Exception as e: |
| logger.error(f"Failed to create or ensure repository {hf_repo_id_for_run}: {e}", exc_info=True) |
| return |
|
|
| commit_message = f"Upload {upload_format.upper()} checkpoint step {step}" |
| path_in_repo = f"step-{step}" |
|
|
| logger.info(f"Uploading {local_path} to {hf_repo_id_for_run}/{path_in_repo} on Hugging Face Hub...") |
| try: |
| api.upload_folder( |
| folder_path=local_path, |
| path_in_repo=path_in_repo, |
| repo_id=hf_repo_id_for_run, |
| repo_type="model", |
| commit_message=commit_message, |
| token=token, |
| ) |
| logger.info(f"Successfully uploaded step {step} to {hf_repo_id_for_run}.") |
| except Exception as e: |
| logger.error(f"Failed to upload checkpoint step {step} to {hf_repo_id_for_run}: {e}", exc_info=True) |
| if hf_keep_latest_k > 0: |
| logger.info(f"Cleaning up old checkpoints on {hf_repo_id_for_run}, keeping latest {hf_keep_latest_k}") |
| try: |
| repo_files = api.list_repo_tree(hf_repo_id_for_run, repo_type="model", token=token, recursive=False) |
| step_folders = [ |
| item.path for item in repo_files |
| if item.path.startswith("step-") and item.path[5:].isdigit() |
| ] |
|
|
| step_folders.sort(key=lambda x: int(x.split('-')[1]), reverse=True) |
|
|
| if len(step_folders) > hf_keep_latest_k: |
| folders_to_delete = step_folders[hf_keep_latest_k:] |
| logger.info(f"Found {len(step_folders)} checkpoints on Hub. Deleting {len(folders_to_delete)} older ones: {folders_to_delete}") |
| for folder in folders_to_delete: |
| |
| api.delete_folder( |
| repo_id=hf_repo_id_for_run, |
| path_in_repo=folder, |
| repo_type="model", |
| commit_message=f"Delete old checkpoint {folder}", |
| token=token |
| ) |
| logger.info("Hub cleanup complete.") |
| else: |
| logger.info("No old checkpoints found on Hub to delete.") |
| except Exception as e: |
| logger.error(f"Error during Hub checkpoint cleanup for {hf_repo_id_for_run}: {e}", exc_info=True) |