Spaces:
Paused
Paused
| #!/usr/bin/env python3 | |
| from huggingface_hub import HfApi | |
| import sys | |
| import os | |
| import shutil | |
| import time | |
| import glob | |
| from datetime import datetime | |
| import tempfile | |
| def manage_backups(api, repo_id, max_files=10): | |
| """管理备份文件数量,删除旧的备份""" | |
| try: | |
| files = api.list_repo_files(repo_id=repo_id, repo_type="dataset") | |
| backup_files = [f for f in files if f.startswith('navidrome_backup_') and f.endswith('.tar.gz')] | |
| backup_files.sort() | |
| if len(backup_files) >= max_files: | |
| files_to_delete = backup_files[:(len(backup_files) - max_files + 1)] | |
| for file_to_delete in files_to_delete: | |
| try: | |
| api.delete_file(path_in_repo=file_to_delete, repo_id=repo_id, repo_type="dataset") | |
| print(f'已删除旧备份: {file_to_delete}') | |
| except Exception as e: | |
| print(f'删除 {file_to_delete} 时出错: {str(e)}') | |
| except Exception as e: | |
| print(f'管理备份时出错: {str(e)}') | |
| def upload_backup(file_path, file_name, token, repo_id): | |
| """上传备份文件到HuggingFace""" | |
| api = HfApi(token=token) | |
| try: | |
| api.upload_file( | |
| path_or_fileobj=file_path, | |
| path_in_repo=file_name, | |
| repo_id=repo_id, | |
| repo_type="dataset" | |
| ) | |
| print(f"成功上传 {file_name}") | |
| manage_backups(api, repo_id) | |
| except Exception as e: | |
| print(f"文件上传出错: {str(e)}") | |
| def download_latest_backup(token, repo_id, data_dir): | |
| """下载最新的备份文件并恢复""" | |
| try: | |
| # 确保目标目录存在并有正确权限 | |
| os.makedirs(data_dir, exist_ok=True) | |
| os.chmod(data_dir, 0o755) | |
| api = HfApi(token=token) | |
| files = api.list_repo_files(repo_id=repo_id, repo_type="dataset") | |
| backup_files = [f for f in files if f.startswith('navidrome_backup_') and f.endswith('.tar.gz')] | |
| if not backup_files: | |
| print("未找到备份文件") | |
| return False | |
| latest_backup = sorted(backup_files)[-1] | |
| # 使用临时目录下载文件 | |
| with tempfile.TemporaryDirectory() as temp_dir: | |
| filepath = api.hf_hub_download( | |
| repo_id=repo_id, | |
| filename=latest_backup, | |
| repo_type="dataset", | |
| local_dir=temp_dir | |
| ) | |
| if filepath and os.path.exists(filepath): | |
| # 解压备份文件 | |
| import tarfile | |
| with tarfile.open(filepath, "r:gz") as tar: | |
| tar.extractall(path=data_dir) | |
| print(f"成功从 {latest_backup} 恢复备份到 {data_dir}") | |
| return True | |
| return False | |
| except Exception as e: | |
| print(f"下载备份时出错: {str(e)}") | |
| return False | |
| def create_backup(data_dir, exclude_dirs=None): | |
| """创建备份文件,排除指定目录""" | |
| if exclude_dirs is None: | |
| exclude_dirs = [] | |
| timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") | |
| backup_filename = f"navidrome_backup_{timestamp}.tar.gz" | |
| backup_path = f"/tmp/{backup_filename}" | |
| try: | |
| import tarfile | |
| with tarfile.open(backup_path, "w:gz") as tar: | |
| for item in os.listdir(data_dir): | |
| item_path = os.path.join(data_dir, item) | |
| # 排除指定目录 | |
| if os.path.isdir(item_path) and item in exclude_dirs: | |
| continue | |
| tar.add(item_path, arcname=item) | |
| print(f"备份文件创建成功: {backup_path}") | |
| return backup_path, backup_filename | |
| except Exception as e: | |
| print(f"创建备份时出错: {str(e)}") | |
| return None, None | |
| if __name__ == "__main__": | |
| action = sys.argv[1] | |
| token = sys.argv[2] | |
| repo_id = sys.argv[3] | |
| data_dir = sys.argv[4] | |
| if action == "upload": | |
| # 排除音乐目录和缓存,只备份数据和配置 | |
| exclude_dirs = ["cache", "music"] | |
| backup_path, backup_filename = create_backup(data_dir, exclude_dirs) | |
| if backup_path: | |
| upload_backup(backup_path, backup_filename, token, repo_id) | |
| # 清理临时文件 | |
| os.remove(backup_path) | |
| elif action == "download": | |
| download_latest_backup(token, repo_id, data_dir) |