2798e29d / scripts /utils /hf_persistence.py
autoface's picture
Update file path and port configuration
519d346
#!/usr/bin/env python3
"""
Hugging Face Data Persistence Tool
This module provides functionality for interacting with Hugging Face Dataset, including:
- Uploading archive files
- Managing archive file count
- LFS storage cleanup
- Downloading and restoring archive files
- Listing available archive files
"""
import sys
import os
import traceback
import time
import tempfile
import subprocess
import argparse
from pathlib import Path
from huggingface_hub import HfApi
# Set UTC timezone
os.environ['TZ'] = 'UTC'
time.tzset()
# Set Hugging Face cache directory
cache_dir = '/home/user/.cache/huggingface'
os.makedirs(cache_dir, exist_ok=True)
os.environ['HF_HOME'] = cache_dir
os.environ['HUGGINGFACE_HUB_CACHE'] = cache_dir
class HFPersistenceManager:
"""Hugging Face Data Persistence Manager"""
def __init__(self, token: str, dataset_id: str):
"""
Initialize the manager
Args:
token: Hugging Face access token
dataset_id: Dataset ID
"""
self.token = token
self.dataset_id = dataset_id
self.api = None
# Set authentication
os.environ['HUGGING_FACE_HUB_TOKEN'] = token
def _get_api(self) -> HfApi:
"""Get HfApi instance"""
if self.api is None:
self.api = HfApi()
return self.api
def upload_archive(self, local_path: str, remote_path: str) -> bool:
"""
Upload archive file to Hugging Face Dataset
Args:
local_path: Local file path
remote_path: Remote file path
Returns:
bool: Whether upload was successful
"""
try:
api = self._get_api()
# Ensure dataset exists and is private before uploading
try:
api.repo_info(repo_id=self.dataset_id, repo_type='dataset')
print(f'βœ“ Dataset exists: {self.dataset_id}')
except Exception:
print(f'πŸ“ Dataset does not exist, creating private dataset: {self.dataset_id}')
api.create_repo(repo_id=self.dataset_id, repo_type='dataset', private=True)
print(f'βœ“ Private dataset created: {self.dataset_id}')
api.upload_file(
path_or_fileobj=local_path,
path_in_repo=remote_path,
repo_id=self.dataset_id,
repo_type='dataset'
)
print(f'βœ“ Archive uploaded successfully: {remote_path}')
return True
except Exception as e:
print(f'βœ— Archive upload failed: {str(e)}')
traceback.print_exc()
return False
def check_and_cleanup_before_upload(self, archive_prefix: str, archive_extension: str, max_files: int) -> bool:
"""
Check if adding new archive would exceed limit, if so, force recreate dataset first
Args:
archive_prefix: Archive file prefix
archive_extension: Archive file extension
max_files: Maximum number of files to keep
Returns:
bool: Whether operation was successful
"""
try:
api = self._get_api()
files = api.list_repo_files(repo_id=self.dataset_id, repo_type='dataset')
archive_files = [f for f in files if f.startswith(archive_prefix) and f.endswith(f'.{archive_extension}')]
archive_files.sort(reverse=True) # Sort newest first
# Check if adding 1 new file would exceed the limit
if len(archive_files) + 1 > max_files:
print(f'🚨 Adding new archive would exceed limit ({len(archive_files)} + 1 > {max_files})')
print('🚨 Starting force dataset recreation to clean up old archives')
# Keep only the newest (max_files - 1) files to make room for the new one
files_to_keep = archive_files[:max_files-1] if max_files > 1 else []
if not self._cleanup_lfs_recreate(api, files_to_keep, max_files, archive_prefix, archive_extension):
return False
print(f'βœ“ Dataset recreation completed, ready for new archive upload')
else:
print(f'βœ“ Archive count check passed ({len(archive_files)} + 1 <= {max_files})')
return True
except Exception as e:
print(f'βœ— Pre-upload cleanup failed: {str(e)}')
return False
def _cleanup_lfs_recreate(self, api: HfApi, files_to_keep: list, max_files: int,
archive_prefix: str, archive_extension: str) -> bool:
"""Force delete and recreate dataset to clean LFS storage"""
print('🚨 WARNING: Force recreate mode enabled')
print('πŸ“‹ This will delete and recreate the entire dataset to clean LFS storage')
# Backup files that need to be preserved
remaining_files = files_to_keep
if remaining_files:
print(f'πŸ“¦ Backing up {len(remaining_files)} files for restoration...')
backup_data = []
for file_name in remaining_files:
try:
# Download file content
file_path = api.hf_hub_download(
repo_id=self.dataset_id,
filename=file_name,
repo_type='dataset'
)
with open(file_path, 'rb') as f:
backup_data.append((file_name, f.read()))
print(f'βœ“ Backed up: {file_name}')
except Exception as e:
print(f'βœ— Backup failed for {file_name}: {str(e)}')
# Delete dataset
try:
print('πŸ—‘οΈ Deleting dataset to clean LFS storage...')
api.delete_repo(repo_id=self.dataset_id, repo_type='dataset')
print('βœ“ Dataset deleted successfully')
# Wait for deletion to complete
time.sleep(10)
# Recreate dataset
print('πŸ”¨ Recreating dataset...')
api.create_repo(repo_id=self.dataset_id, repo_type='dataset', exist_ok=True, private=True)
print('βœ“ Dataset recreated successfully')
# Restore backed up files
print('πŸ“€ Restoring backed up files...')
for file_name, file_content in backup_data:
try:
# Create temporary file
with tempfile.NamedTemporaryFile(delete=False) as temp_file:
temp_file.write(file_content)
temp_path = temp_file.name
# Re-upload
api.upload_file(
path_or_fileobj=temp_path,
path_in_repo=file_name,
repo_id=self.dataset_id,
repo_type='dataset'
)
# Clean up temporary file
os.unlink(temp_path)
print(f'βœ“ Restored: {file_name}')
except Exception as e:
print(f'βœ— Restore failed for {file_name}: {str(e)}')
print('πŸŽ‰ Dataset recreation and LFS cleanup completed!')
return True
except Exception as e:
print(f'βœ— Dataset recreation failed: {str(e)}')
print('⚠️ Manual intervention may be required')
return False
else:
print('πŸ“ No files to preserve, proceeding with dataset cleanup')
# Delete dataset even when no files need to be preserved
try:
print('πŸ—‘οΈ Deleting dataset to clean LFS storage...')
api.delete_repo(repo_id=self.dataset_id, repo_type='dataset')
print('βœ“ Dataset deleted successfully')
# Wait for deletion to complete
time.sleep(10)
# Recreate dataset
print('πŸ”¨ Recreating dataset...')
api.create_repo(repo_id=self.dataset_id, repo_type='dataset', exist_ok=True, private=True)
print('βœ“ Dataset recreated successfully')
print('πŸŽ‰ Dataset recreation and LFS cleanup completed!')
return True
except Exception as e:
print(f'βœ— Dataset recreation failed: {str(e)}')
print('⚠️ Manual intervention may be required')
return False
def list_available_archives(self, archive_prefix: str, archive_extension: str) -> tuple[bool, str]:
"""
List available archive files
Args:
archive_prefix: Archive file prefix
archive_extension: Archive file extension
Returns:
tuple: (success status, latest archive filename)
"""
try:
api = self._get_api()
files = api.list_repo_files(self.dataset_id, repo_type='dataset')
archive_files = [f for f in files if f.startswith(archive_prefix) and f.endswith(f'.{archive_extension}')]
archive_files.sort(reverse=True)
if archive_files:
print('Available archive list:')
for i, archive in enumerate(archive_files, 1):
print(f' {i}. {archive}')
# Return latest archive filename
print(f'LATEST_BACKUP:{archive_files[0]}')
return True, archive_files[0]
else:
print('No archive files found')
return False, ""
except Exception as e:
print(f'Failed to get archive list: {str(e)}')
traceback.print_exc()
return False, ""
def restore_from_archive(self, archive_name: str, restore_path: str) -> bool:
"""
Restore archive from Hugging Face Dataset
Args:
archive_name: Archive filename
restore_path: Restore path
Returns:
bool: Whether restoration was successful
"""
try:
api = self._get_api()
# Ensure download directory exists with proper permissions
download_dir = '/home/user/download'
os.makedirs(download_dir, exist_ok=True)
# Download archive file to user directory
print(f'Downloading archive: {archive_name}')
local_path = api.hf_hub_download(
repo_id=self.dataset_id,
filename=archive_name,
repo_type='dataset',
local_dir=download_dir
)
# Extract archive with better error handling
print(f'Extracting archive to: {restore_path}')
# Use subprocess for better control and error handling
# Set UTC timezone for timestamp consistency
env = os.environ.copy()
env['TZ'] = 'UTC'
extract_cmd = [
'tar', '-xzf', local_path, '-C', restore_path,
'--warning=no-timestamp', # Suppress timestamp warnings
'--warning=no-unknown-keyword', # Suppress unknown keyword warnings
'--no-same-owner', # Don't try to restore original ownership
'--no-same-permissions', # Don't try to restore original permissions
'--touch' # Set extracted files timestamps to current UTC time
]
result = subprocess.run(extract_cmd, capture_output=True, text=True, env=env)
if result.returncode == 0:
print(f'βœ“ Archive restored successfully: {archive_name}')
print('βœ“ Timestamps normalized to UTC timezone')
# Clean up temporary file
os.remove(local_path)
return True
else:
print(f'βœ— Archive extraction failed with return code: {result.returncode}')
if result.stderr:
print(f'Error output: {result.stderr}')
return False
except Exception as e:
print(f'βœ— Archive restoration failed: {str(e)}')
traceback.print_exc()
return False
def main():
"""Command line entry point"""
parser = argparse.ArgumentParser(description='Hugging Face Data Persistence Tool')
parser.add_argument('action', choices=['upload', 'list', 'restore'],
help='Action to perform')
parser.add_argument('--token', required=True, help='Hugging Face access token')
parser.add_argument('--dataset-id', required=True, help='Dataset ID')
parser.add_argument('--archive-file', help='Archive file path (for upload)')
parser.add_argument('--filename', help='Remote filename (for upload)')
parser.add_argument('--archive-prefix', default='backup', help='Archive file prefix')
parser.add_argument('--archive-extension', default='tar.gz', help='Archive file extension')
parser.add_argument('--max-archives', type=int, default=5, help='Maximum number of archives to keep')
parser.add_argument('--archive-name', help='Archive name to restore (for restore)')
parser.add_argument('--restore-path', default='./', help='Restore path (for restore)')
args = parser.parse_args()
manager = HFPersistenceManager(args.token, args.dataset_id)
if args.action == 'upload':
if not args.archive_file or not args.filename:
print('βœ— upload action requires --archive-file and --filename parameters')
sys.exit(1)
# First check and cleanup if needed BEFORE uploading
success = manager.check_and_cleanup_before_upload(args.archive_prefix, args.archive_extension, args.max_archives)
if success:
success = manager.upload_archive(args.archive_file, args.filename)
sys.exit(0 if success else 1)
elif args.action == 'list':
success, latest = manager.list_available_archives(args.archive_prefix, args.archive_extension)
sys.exit(0 if success else 1)
elif args.action == 'restore':
if not args.archive_name:
print('βœ— restore action requires --archive-name parameter')
sys.exit(1)
success = manager.restore_from_archive(args.archive_name, args.restore_path)
sys.exit(0 if success else 1)
if __name__ == '__main__':
main()