| |
| from huggingface_hub import HfApi |
| import sys |
| import os |
| import shutil |
| import time |
| import glob |
| from datetime import datetime |
| import tempfile |
|
|
| def manage_backups(api, repo_id, max_files=10): |
| """管理备份文件数量,删除旧的备份""" |
| try: |
| files = api.list_repo_files(repo_id=repo_id, repo_type="dataset") |
| backup_files = [f for f in files if f.startswith('navidrome_backup_') and f.endswith('.tar.gz')] |
| backup_files.sort() |
| |
| if len(backup_files) >= max_files: |
| files_to_delete = backup_files[:(len(backup_files) - max_files + 1)] |
| for file_to_delete in files_to_delete: |
| try: |
| api.delete_file(path_in_repo=file_to_delete, repo_id=repo_id, repo_type="dataset") |
| print(f'已删除旧备份: {file_to_delete}') |
| except Exception as e: |
| print(f'删除 {file_to_delete} 时出错: {str(e)}') |
| except Exception as e: |
| print(f'管理备份时出错: {str(e)}') |
|
|
| def upload_backup(file_path, file_name, token, repo_id): |
| """上传备份文件到HuggingFace""" |
| api = HfApi(token=token) |
| try: |
| api.upload_file( |
| path_or_fileobj=file_path, |
| path_in_repo=file_name, |
| repo_id=repo_id, |
| repo_type="dataset" |
| ) |
| print(f"成功上传 {file_name}") |
| |
| manage_backups(api, repo_id) |
| except Exception as e: |
| print(f"文件上传出错: {str(e)}") |
|
|
| def download_latest_backup(token, repo_id, data_dir): |
| """下载最新的备份文件并恢复""" |
| try: |
| |
| os.makedirs(data_dir, exist_ok=True) |
| os.chmod(data_dir, 0o755) |
| |
| api = HfApi(token=token) |
| files = api.list_repo_files(repo_id=repo_id, repo_type="dataset") |
| backup_files = [f for f in files if f.startswith('navidrome_backup_') and f.endswith('.tar.gz')] |
| |
| if not backup_files: |
| print("未找到备份文件") |
| return False |
| |
| latest_backup = sorted(backup_files)[-1] |
| |
| |
| with tempfile.TemporaryDirectory() as temp_dir: |
| filepath = api.hf_hub_download( |
| repo_id=repo_id, |
| filename=latest_backup, |
| repo_type="dataset", |
| local_dir=temp_dir |
| ) |
| |
| if filepath and os.path.exists(filepath): |
| |
| import tarfile |
| with tarfile.open(filepath, "r:gz") as tar: |
| tar.extractall(path=data_dir) |
| |
| print(f"成功从 {latest_backup} 恢复备份到 {data_dir}") |
| return True |
| return False |
| |
| except Exception as e: |
| print(f"下载备份时出错: {str(e)}") |
| return False |
|
|
| def create_backup(data_dir, exclude_dirs=None): |
| """创建备份文件,排除指定目录""" |
| if exclude_dirs is None: |
| exclude_dirs = [] |
| |
| timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") |
| backup_filename = f"navidrome_backup_{timestamp}.tar.gz" |
| backup_path = f"/tmp/{backup_filename}" |
| |
| try: |
| import tarfile |
| with tarfile.open(backup_path, "w:gz") as tar: |
| for item in os.listdir(data_dir): |
| item_path = os.path.join(data_dir, item) |
| |
| if os.path.isdir(item_path) and item in exclude_dirs: |
| continue |
| tar.add(item_path, arcname=item) |
| |
| print(f"备份文件创建成功: {backup_path}") |
| return backup_path, backup_filename |
| except Exception as e: |
| print(f"创建备份时出错: {str(e)}") |
| return None, None |
|
|
| if __name__ == "__main__": |
| action = sys.argv[1] |
| token = sys.argv[2] |
| repo_id = sys.argv[3] |
| data_dir = sys.argv[4] |
| |
| if action == "upload": |
| |
| exclude_dirs = ["cache", "music"] |
| backup_path, backup_filename = create_backup(data_dir, exclude_dirs) |
| if backup_path: |
| upload_backup(backup_path, backup_filename, token, repo_id) |
| |
| os.remove(backup_path) |
| elif action == "download": |
| download_latest_backup(token, repo_id, data_dir) |