|
|
|
|
|
""" |
|
|
Hugging Face Data Persistence Tool |
|
|
|
|
|
This module provides functionality for interacting with Hugging Face Dataset, including: |
|
|
- Uploading archive files |
|
|
- Managing archive file count |
|
|
- LFS storage cleanup |
|
|
- Downloading and restoring archive files |
|
|
- Listing available archive files |
|
|
""" |
|
|
|
|
|
import sys |
|
|
import os |
|
|
import traceback |
|
|
import time |
|
|
import tempfile |
|
|
import subprocess |
|
|
import argparse |
|
|
from pathlib import Path |
|
|
from huggingface_hub import HfApi |
|
|
|
|
|
|
|
|
os.environ['TZ'] = 'UTC' |
|
|
time.tzset() |
|
|
|
|
|
|
|
|
cache_dir = '/home/user/.cache/huggingface' |
|
|
os.makedirs(cache_dir, exist_ok=True) |
|
|
os.environ['HF_HOME'] = cache_dir |
|
|
os.environ['HUGGINGFACE_HUB_CACHE'] = cache_dir |
|
|
|
|
|
|
|
|
class HFPersistenceManager: |
|
|
"""Hugging Face Data Persistence Manager""" |
|
|
|
|
|
def __init__(self, token: str, dataset_id: str): |
|
|
""" |
|
|
Initialize the manager |
|
|
|
|
|
Args: |
|
|
token: Hugging Face access token |
|
|
dataset_id: Dataset ID |
|
|
""" |
|
|
self.token = token |
|
|
self.dataset_id = dataset_id |
|
|
self.api = None |
|
|
|
|
|
|
|
|
os.environ['HUGGING_FACE_HUB_TOKEN'] = token |
|
|
|
|
|
def _get_api(self) -> HfApi: |
|
|
"""Get HfApi instance""" |
|
|
if self.api is None: |
|
|
self.api = HfApi() |
|
|
return self.api |
|
|
|
|
|
def upload_archive(self, local_path: str, remote_path: str) -> bool: |
|
|
""" |
|
|
Upload archive file to Hugging Face Dataset |
|
|
|
|
|
Args: |
|
|
local_path: Local file path |
|
|
remote_path: Remote file path |
|
|
|
|
|
Returns: |
|
|
bool: Whether upload was successful |
|
|
""" |
|
|
try: |
|
|
api = self._get_api() |
|
|
|
|
|
|
|
|
try: |
|
|
api.repo_info(repo_id=self.dataset_id, repo_type='dataset') |
|
|
print(f'β Dataset exists: {self.dataset_id}') |
|
|
except Exception: |
|
|
print(f'π Dataset does not exist, creating private dataset: {self.dataset_id}') |
|
|
api.create_repo(repo_id=self.dataset_id, repo_type='dataset', private=True) |
|
|
print(f'β Private dataset created: {self.dataset_id}') |
|
|
|
|
|
api.upload_file( |
|
|
path_or_fileobj=local_path, |
|
|
path_in_repo=remote_path, |
|
|
repo_id=self.dataset_id, |
|
|
repo_type='dataset' |
|
|
) |
|
|
print(f'β Archive uploaded successfully: {remote_path}') |
|
|
return True |
|
|
except Exception as e: |
|
|
print(f'β Archive upload failed: {str(e)}') |
|
|
traceback.print_exc() |
|
|
return False |
|
|
|
|
|
def check_and_cleanup_before_upload(self, archive_prefix: str, archive_extension: str, max_files: int) -> bool: |
|
|
""" |
|
|
Check if adding new archive would exceed limit, if so, force recreate dataset first |
|
|
|
|
|
Args: |
|
|
archive_prefix: Archive file prefix |
|
|
archive_extension: Archive file extension |
|
|
max_files: Maximum number of files to keep |
|
|
|
|
|
Returns: |
|
|
bool: Whether operation was successful |
|
|
""" |
|
|
try: |
|
|
api = self._get_api() |
|
|
files = api.list_repo_files(repo_id=self.dataset_id, repo_type='dataset') |
|
|
archive_files = [f for f in files if f.startswith(archive_prefix) and f.endswith(f'.{archive_extension}')] |
|
|
archive_files.sort(reverse=True) |
|
|
|
|
|
|
|
|
if len(archive_files) + 1 > max_files: |
|
|
print(f'π¨ Adding new archive would exceed limit ({len(archive_files)} + 1 > {max_files})') |
|
|
print('π¨ Starting force dataset recreation to clean up old archives') |
|
|
|
|
|
|
|
|
files_to_keep = archive_files[:max_files-1] if max_files > 1 else [] |
|
|
|
|
|
if not self._cleanup_lfs_recreate(api, files_to_keep, max_files, archive_prefix, archive_extension): |
|
|
return False |
|
|
|
|
|
print(f'β Dataset recreation completed, ready for new archive upload') |
|
|
else: |
|
|
print(f'β Archive count check passed ({len(archive_files)} + 1 <= {max_files})') |
|
|
|
|
|
return True |
|
|
except Exception as e: |
|
|
print(f'β Pre-upload cleanup failed: {str(e)}') |
|
|
return False |
|
|
|
|
|
|
|
|
def _cleanup_lfs_recreate(self, api: HfApi, files_to_keep: list, max_files: int, |
|
|
archive_prefix: str, archive_extension: str) -> bool: |
|
|
"""Force delete and recreate dataset to clean LFS storage""" |
|
|
print('π¨ WARNING: Force recreate mode enabled') |
|
|
print('π This will delete and recreate the entire dataset to clean LFS storage') |
|
|
|
|
|
|
|
|
remaining_files = files_to_keep |
|
|
if remaining_files: |
|
|
print(f'π¦ Backing up {len(remaining_files)} files for restoration...') |
|
|
backup_data = [] |
|
|
for file_name in remaining_files: |
|
|
try: |
|
|
|
|
|
file_path = api.hf_hub_download( |
|
|
repo_id=self.dataset_id, |
|
|
filename=file_name, |
|
|
repo_type='dataset' |
|
|
) |
|
|
with open(file_path, 'rb') as f: |
|
|
backup_data.append((file_name, f.read())) |
|
|
print(f'β Backed up: {file_name}') |
|
|
except Exception as e: |
|
|
print(f'β Backup failed for {file_name}: {str(e)}') |
|
|
|
|
|
|
|
|
try: |
|
|
print('ποΈ Deleting dataset to clean LFS storage...') |
|
|
api.delete_repo(repo_id=self.dataset_id, repo_type='dataset') |
|
|
print('β Dataset deleted successfully') |
|
|
|
|
|
|
|
|
time.sleep(10) |
|
|
|
|
|
|
|
|
print('π¨ Recreating dataset...') |
|
|
api.create_repo(repo_id=self.dataset_id, repo_type='dataset', exist_ok=True, private=True) |
|
|
print('β Dataset recreated successfully') |
|
|
|
|
|
|
|
|
print('π€ Restoring backed up files...') |
|
|
for file_name, file_content in backup_data: |
|
|
try: |
|
|
|
|
|
with tempfile.NamedTemporaryFile(delete=False) as temp_file: |
|
|
temp_file.write(file_content) |
|
|
temp_path = temp_file.name |
|
|
|
|
|
|
|
|
api.upload_file( |
|
|
path_or_fileobj=temp_path, |
|
|
path_in_repo=file_name, |
|
|
repo_id=self.dataset_id, |
|
|
repo_type='dataset' |
|
|
) |
|
|
|
|
|
|
|
|
os.unlink(temp_path) |
|
|
print(f'β Restored: {file_name}') |
|
|
except Exception as e: |
|
|
print(f'β Restore failed for {file_name}: {str(e)}') |
|
|
|
|
|
print('π Dataset recreation and LFS cleanup completed!') |
|
|
return True |
|
|
|
|
|
except Exception as e: |
|
|
print(f'β Dataset recreation failed: {str(e)}') |
|
|
print('β οΈ Manual intervention may be required') |
|
|
return False |
|
|
else: |
|
|
print('π No files to preserve, proceeding with dataset cleanup') |
|
|
|
|
|
try: |
|
|
print('ποΈ Deleting dataset to clean LFS storage...') |
|
|
api.delete_repo(repo_id=self.dataset_id, repo_type='dataset') |
|
|
print('β Dataset deleted successfully') |
|
|
|
|
|
|
|
|
time.sleep(10) |
|
|
|
|
|
|
|
|
print('π¨ Recreating dataset...') |
|
|
api.create_repo(repo_id=self.dataset_id, repo_type='dataset', exist_ok=True, private=True) |
|
|
print('β Dataset recreated successfully') |
|
|
|
|
|
print('π Dataset recreation and LFS cleanup completed!') |
|
|
return True |
|
|
|
|
|
except Exception as e: |
|
|
print(f'β Dataset recreation failed: {str(e)}') |
|
|
print('β οΈ Manual intervention may be required') |
|
|
return False |
|
|
|
|
|
def list_available_archives(self, archive_prefix: str, archive_extension: str) -> tuple[bool, str]: |
|
|
""" |
|
|
List available archive files |
|
|
|
|
|
Args: |
|
|
archive_prefix: Archive file prefix |
|
|
archive_extension: Archive file extension |
|
|
|
|
|
Returns: |
|
|
tuple: (success status, latest archive filename) |
|
|
""" |
|
|
try: |
|
|
api = self._get_api() |
|
|
files = api.list_repo_files(self.dataset_id, repo_type='dataset') |
|
|
archive_files = [f for f in files if f.startswith(archive_prefix) and f.endswith(f'.{archive_extension}')] |
|
|
archive_files.sort(reverse=True) |
|
|
|
|
|
if archive_files: |
|
|
print('Available archive list:') |
|
|
for i, archive in enumerate(archive_files, 1): |
|
|
print(f' {i}. {archive}') |
|
|
|
|
|
print(f'LATEST_BACKUP:{archive_files[0]}') |
|
|
return True, archive_files[0] |
|
|
else: |
|
|
print('No archive files found') |
|
|
return False, "" |
|
|
except Exception as e: |
|
|
print(f'Failed to get archive list: {str(e)}') |
|
|
traceback.print_exc() |
|
|
return False, "" |
|
|
|
|
|
def restore_from_archive(self, archive_name: str, restore_path: str) -> bool: |
|
|
""" |
|
|
Restore archive from Hugging Face Dataset |
|
|
|
|
|
Args: |
|
|
archive_name: Archive filename |
|
|
restore_path: Restore path |
|
|
|
|
|
Returns: |
|
|
bool: Whether restoration was successful |
|
|
""" |
|
|
try: |
|
|
api = self._get_api() |
|
|
|
|
|
|
|
|
download_dir = '/home/user/download' |
|
|
os.makedirs(download_dir, exist_ok=True) |
|
|
|
|
|
|
|
|
print(f'Downloading archive: {archive_name}') |
|
|
local_path = api.hf_hub_download( |
|
|
repo_id=self.dataset_id, |
|
|
filename=archive_name, |
|
|
repo_type='dataset', |
|
|
local_dir=download_dir |
|
|
) |
|
|
|
|
|
|
|
|
print(f'Extracting archive to: {restore_path}') |
|
|
|
|
|
|
|
|
|
|
|
env = os.environ.copy() |
|
|
env['TZ'] = 'UTC' |
|
|
|
|
|
extract_cmd = [ |
|
|
'tar', '-xzf', local_path, '-C', restore_path, |
|
|
'--warning=no-timestamp', |
|
|
'--warning=no-unknown-keyword', |
|
|
'--no-same-owner', |
|
|
'--no-same-permissions', |
|
|
'--touch' |
|
|
] |
|
|
|
|
|
result = subprocess.run(extract_cmd, capture_output=True, text=True, env=env) |
|
|
|
|
|
if result.returncode == 0: |
|
|
print(f'β Archive restored successfully: {archive_name}') |
|
|
print('β Timestamps normalized to UTC timezone') |
|
|
|
|
|
|
|
|
os.remove(local_path) |
|
|
return True |
|
|
else: |
|
|
print(f'β Archive extraction failed with return code: {result.returncode}') |
|
|
if result.stderr: |
|
|
print(f'Error output: {result.stderr}') |
|
|
return False |
|
|
|
|
|
except Exception as e: |
|
|
print(f'β Archive restoration failed: {str(e)}') |
|
|
traceback.print_exc() |
|
|
return False |
|
|
|
|
|
|
|
|
def main(): |
|
|
"""Command line entry point""" |
|
|
parser = argparse.ArgumentParser(description='Hugging Face Data Persistence Tool') |
|
|
parser.add_argument('action', choices=['upload', 'list', 'restore'], |
|
|
help='Action to perform') |
|
|
parser.add_argument('--token', required=True, help='Hugging Face access token') |
|
|
parser.add_argument('--dataset-id', required=True, help='Dataset ID') |
|
|
parser.add_argument('--archive-file', help='Archive file path (for upload)') |
|
|
parser.add_argument('--filename', help='Remote filename (for upload)') |
|
|
parser.add_argument('--archive-prefix', default='backup', help='Archive file prefix') |
|
|
parser.add_argument('--archive-extension', default='tar.gz', help='Archive file extension') |
|
|
parser.add_argument('--max-archives', type=int, default=5, help='Maximum number of archives to keep') |
|
|
parser.add_argument('--archive-name', help='Archive name to restore (for restore)') |
|
|
parser.add_argument('--restore-path', default='./', help='Restore path (for restore)') |
|
|
|
|
|
args = parser.parse_args() |
|
|
|
|
|
manager = HFPersistenceManager(args.token, args.dataset_id) |
|
|
|
|
|
if args.action == 'upload': |
|
|
if not args.archive_file or not args.filename: |
|
|
print('β upload action requires --archive-file and --filename parameters') |
|
|
sys.exit(1) |
|
|
|
|
|
|
|
|
success = manager.check_and_cleanup_before_upload(args.archive_prefix, args.archive_extension, args.max_archives) |
|
|
if success: |
|
|
success = manager.upload_archive(args.archive_file, args.filename) |
|
|
|
|
|
sys.exit(0 if success else 1) |
|
|
|
|
|
elif args.action == 'list': |
|
|
success, latest = manager.list_available_archives(args.archive_prefix, args.archive_extension) |
|
|
sys.exit(0 if success else 1) |
|
|
|
|
|
elif args.action == 'restore': |
|
|
if not args.archive_name: |
|
|
print('β restore action requires --archive-name parameter') |
|
|
sys.exit(1) |
|
|
|
|
|
success = manager.restore_from_archive(args.archive_name, args.restore_path) |
|
|
sys.exit(0 if success else 1) |
|
|
|
|
|
|
|
|
if __name__ == '__main__': |
|
|
main() |