#!/usr/bin/env python3 """ Hugging Face Data Persistence Tool This module provides functionality for interacting with Hugging Face Dataset, including: - Uploading archive files - Managing archive file count - LFS storage cleanup - Downloading and restoring archive files - Listing available archive files """ import sys import os import traceback import time import tempfile import subprocess import argparse from pathlib import Path from huggingface_hub import HfApi # Set UTC timezone os.environ['TZ'] = 'UTC' time.tzset() # Set Hugging Face cache directory cache_dir = '/home/user/.cache/huggingface' os.makedirs(cache_dir, exist_ok=True) os.environ['HF_HOME'] = cache_dir os.environ['HUGGINGFACE_HUB_CACHE'] = cache_dir class HFPersistenceManager: """Hugging Face Data Persistence Manager""" def __init__(self, token: str, dataset_id: str): """ Initialize the manager Args: token: Hugging Face access token dataset_id: Dataset ID """ self.token = token self.dataset_id = dataset_id self.api = None # Set authentication os.environ['HUGGING_FACE_HUB_TOKEN'] = token def _get_api(self) -> HfApi: """Get HfApi instance""" if self.api is None: self.api = HfApi() return self.api def upload_archive(self, local_path: str, remote_path: str) -> bool: """ Upload archive file to Hugging Face Dataset Args: local_path: Local file path remote_path: Remote file path Returns: bool: Whether upload was successful """ try: api = self._get_api() # Ensure dataset exists and is private before uploading try: api.repo_info(repo_id=self.dataset_id, repo_type='dataset') print(f'✓ Dataset exists: {self.dataset_id}') except Exception: print(f'📝 Dataset does not exist, creating private dataset: {self.dataset_id}') api.create_repo(repo_id=self.dataset_id, repo_type='dataset', private=True) print(f'✓ Private dataset created: {self.dataset_id}') api.upload_file( path_or_fileobj=local_path, path_in_repo=remote_path, repo_id=self.dataset_id, repo_type='dataset' ) print(f'✓ Archive uploaded successfully: {remote_path}') return True except Exception as e: print(f'✗ Archive upload failed: {str(e)}') traceback.print_exc() return False def check_and_cleanup_before_upload(self, archive_prefix: str, archive_extension: str, max_files: int) -> bool: """ Check if adding new archive would exceed limit, if so, force recreate dataset first Args: archive_prefix: Archive file prefix archive_extension: Archive file extension max_files: Maximum number of files to keep Returns: bool: Whether operation was successful """ try: api = self._get_api() files = api.list_repo_files(repo_id=self.dataset_id, repo_type='dataset') archive_files = [f for f in files if f.startswith(archive_prefix) and f.endswith(f'.{archive_extension}')] archive_files.sort(reverse=True) # Sort newest first # Check if adding 1 new file would exceed the limit if len(archive_files) + 1 > max_files: print(f'🚨 Adding new archive would exceed limit ({len(archive_files)} + 1 > {max_files})') print('🚨 Starting force dataset recreation to clean up old archives') # Keep only the newest (max_files - 1) files to make room for the new one files_to_keep = archive_files[:max_files-1] if max_files > 1 else [] if not self._cleanup_lfs_recreate(api, files_to_keep, max_files, archive_prefix, archive_extension): return False print(f'✓ Dataset recreation completed, ready for new archive upload') else: print(f'✓ Archive count check passed ({len(archive_files)} + 1 <= {max_files})') return True except Exception as e: print(f'✗ Pre-upload cleanup failed: {str(e)}') return False def _cleanup_lfs_recreate(self, api: HfApi, files_to_keep: list, max_files: int, archive_prefix: str, archive_extension: str) -> bool: """Force delete and recreate dataset to clean LFS storage""" print('🚨 WARNING: Force recreate mode enabled') print('📋 This will delete and recreate the entire dataset to clean LFS storage') # Backup files that need to be preserved remaining_files = files_to_keep if remaining_files: print(f'📦 Backing up {len(remaining_files)} files for restoration...') backup_data = [] for file_name in remaining_files: try: # Download file content file_path = api.hf_hub_download( repo_id=self.dataset_id, filename=file_name, repo_type='dataset' ) with open(file_path, 'rb') as f: backup_data.append((file_name, f.read())) print(f'✓ Backed up: {file_name}') except Exception as e: print(f'✗ Backup failed for {file_name}: {str(e)}') # Delete dataset try: print('🗑️ Deleting dataset to clean LFS storage...') api.delete_repo(repo_id=self.dataset_id, repo_type='dataset') print('✓ Dataset deleted successfully') # Wait for deletion to complete time.sleep(10) # Recreate dataset print('🔨 Recreating dataset...') api.create_repo(repo_id=self.dataset_id, repo_type='dataset', exist_ok=True, private=True) print('✓ Dataset recreated successfully') # Restore backed up files print('📤 Restoring backed up files...') for file_name, file_content in backup_data: try: # Create temporary file with tempfile.NamedTemporaryFile(delete=False) as temp_file: temp_file.write(file_content) temp_path = temp_file.name # Re-upload api.upload_file( path_or_fileobj=temp_path, path_in_repo=file_name, repo_id=self.dataset_id, repo_type='dataset' ) # Clean up temporary file os.unlink(temp_path) print(f'✓ Restored: {file_name}') except Exception as e: print(f'✗ Restore failed for {file_name}: {str(e)}') print('🎉 Dataset recreation and LFS cleanup completed!') return True except Exception as e: print(f'✗ Dataset recreation failed: {str(e)}') print('⚠️ Manual intervention may be required') return False else: print('📝 No files to preserve, proceeding with dataset cleanup') # Delete dataset even when no files need to be preserved try: print('🗑️ Deleting dataset to clean LFS storage...') api.delete_repo(repo_id=self.dataset_id, repo_type='dataset') print('✓ Dataset deleted successfully') # Wait for deletion to complete time.sleep(10) # Recreate dataset print('🔨 Recreating dataset...') api.create_repo(repo_id=self.dataset_id, repo_type='dataset', exist_ok=True, private=True) print('✓ Dataset recreated successfully') print('🎉 Dataset recreation and LFS cleanup completed!') return True except Exception as e: print(f'✗ Dataset recreation failed: {str(e)}') print('⚠️ Manual intervention may be required') return False def list_available_archives(self, archive_prefix: str, archive_extension: str) -> tuple[bool, str]: """ List available archive files Args: archive_prefix: Archive file prefix archive_extension: Archive file extension Returns: tuple: (success status, latest archive filename) """ try: api = self._get_api() files = api.list_repo_files(self.dataset_id, repo_type='dataset') archive_files = [f for f in files if f.startswith(archive_prefix) and f.endswith(f'.{archive_extension}')] archive_files.sort(reverse=True) if archive_files: print('Available archive list:') for i, archive in enumerate(archive_files, 1): print(f' {i}. {archive}') # Return latest archive filename print(f'LATEST_BACKUP:{archive_files[0]}') return True, archive_files[0] else: print('No archive files found') return False, "" except Exception as e: print(f'Failed to get archive list: {str(e)}') traceback.print_exc() return False, "" def restore_from_archive(self, archive_name: str, restore_path: str) -> bool: """ Restore archive from Hugging Face Dataset Args: archive_name: Archive filename restore_path: Restore path Returns: bool: Whether restoration was successful """ try: api = self._get_api() # Ensure download directory exists with proper permissions download_dir = '/home/user/download' os.makedirs(download_dir, exist_ok=True) # Download archive file to user directory print(f'Downloading archive: {archive_name}') local_path = api.hf_hub_download( repo_id=self.dataset_id, filename=archive_name, repo_type='dataset', local_dir=download_dir ) # Extract archive with better error handling print(f'Extracting archive to: {restore_path}') # Use subprocess for better control and error handling # Set UTC timezone for timestamp consistency env = os.environ.copy() env['TZ'] = 'UTC' extract_cmd = [ 'tar', '-xzf', local_path, '-C', restore_path, '--warning=no-timestamp', # Suppress timestamp warnings '--warning=no-unknown-keyword', # Suppress unknown keyword warnings '--no-same-owner', # Don't try to restore original ownership '--no-same-permissions', # Don't try to restore original permissions '--touch' # Set extracted files timestamps to current UTC time ] result = subprocess.run(extract_cmd, capture_output=True, text=True, env=env) if result.returncode == 0: print(f'✓ Archive restored successfully: {archive_name}') print('✓ Timestamps normalized to UTC timezone') # Clean up temporary file os.remove(local_path) return True else: print(f'✗ Archive extraction failed with return code: {result.returncode}') if result.stderr: print(f'Error output: {result.stderr}') return False except Exception as e: print(f'✗ Archive restoration failed: {str(e)}') traceback.print_exc() return False def main(): """Command line entry point""" parser = argparse.ArgumentParser(description='Hugging Face Data Persistence Tool') parser.add_argument('action', choices=['upload', 'list', 'restore'], help='Action to perform') parser.add_argument('--token', required=True, help='Hugging Face access token') parser.add_argument('--dataset-id', required=True, help='Dataset ID') parser.add_argument('--archive-file', help='Archive file path (for upload)') parser.add_argument('--filename', help='Remote filename (for upload)') parser.add_argument('--archive-prefix', default='backup', help='Archive file prefix') parser.add_argument('--archive-extension', default='tar.gz', help='Archive file extension') parser.add_argument('--max-archives', type=int, default=5, help='Maximum number of archives to keep') parser.add_argument('--archive-name', help='Archive name to restore (for restore)') parser.add_argument('--restore-path', default='./', help='Restore path (for restore)') args = parser.parse_args() manager = HFPersistenceManager(args.token, args.dataset_id) if args.action == 'upload': if not args.archive_file or not args.filename: print('✗ upload action requires --archive-file and --filename parameters') sys.exit(1) # First check and cleanup if needed BEFORE uploading success = manager.check_and_cleanup_before_upload(args.archive_prefix, args.archive_extension, args.max_archives) if success: success = manager.upload_archive(args.archive_file, args.filename) sys.exit(0 if success else 1) elif args.action == 'list': success, latest = manager.list_available_archives(args.archive_prefix, args.archive_extension) sys.exit(0 if success else 1) elif args.action == 'restore': if not args.archive_name: print('✗ restore action requires --archive-name parameter') sys.exit(1) success = manager.restore_from_archive(args.archive_name, args.restore_path) sys.exit(0 if success else 1) if __name__ == '__main__': main()