Spaces:
Paused
Paused
| #!/usr/bin/env python3 | |
| import os | |
| import shutil | |
| import json | |
| import hashlib | |
| from pathlib import Path | |
| from datetime import datetime | |
| from huggingface_hub import HfApi, create_repo, list_repo_files, delete_repo | |
| import tarfile | |
| import tempfile | |
| import logging | |
| # Set up logging | |
| logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') | |
| logger = logging.getLogger(__name__) | |
| class HFStorageSync: | |
| def __init__(self, repo_id, token=None, data_dir="/tmp/open-webui-data", | |
| max_backups=3, compression_level=6): | |
| self.repo_id = repo_id | |
| self.data_dir = Path(data_dir) | |
| self.token = token | |
| self.max_backups = max_backups | |
| self.compression_level = compression_level | |
| # Initialize API with token directly | |
| self.api = HfApi(token=token) if token else HfApi() | |
| # File patterns for better organization | |
| self.archive_pattern = "data-{timestamp}.tar.gz" | |
| self.latest_link = "data-latest.tar.gz" | |
| self.metadata_file = "storage-metadata.json" | |
| def _get_directory_hash(self): | |
| """Calculate hash of directory contents for change detection""" | |
| hasher = hashlib.sha256() | |
| if not self.data_dir.exists(): | |
| return hasher.hexdigest() | |
| for item in sorted(self.data_dir.rglob('*')): | |
| if item.is_file() and item.name not in [".gitkeep", "test_write"]: | |
| hasher.update(str(item.relative_to(self.data_dir)).encode()) | |
| hasher.update(str(item.stat().st_mtime).encode()) | |
| hasher.update(str(item.stat().st_size).encode()) | |
| return hasher.hexdigest() | |
| def _get_archive_size(self, archive_path): | |
| """Get the size of an archive file""" | |
| try: | |
| return os.path.getsize(archive_path) | |
| except: | |
| return 0 | |
| def _format_size(self, size_bytes): | |
| """Format file size in human readable format""" | |
| for unit in ['B', 'KB', 'MB', 'GB']: | |
| if size_bytes < 1024.0: | |
| return f"{size_bytes:.1f} {unit}" | |
| size_bytes /= 1024.0 | |
| return f"{size_bytes:.1f} TB" | |
| def ensure_repo_exists(self): | |
| """Create repository if it doesn't exist""" | |
| if not self.token: | |
| logger.warning("No token provided, cannot create repository") | |
| return False | |
| try: | |
| # Check if repo exists | |
| repo_info = self.api.repo_info(repo_id=self.repo_id, repo_type="dataset") | |
| logger.info(f"Repository {self.repo_id} exists") | |
| return True | |
| except Exception as e: | |
| logger.info(f"Repository {self.repo_id} not found, attempting to create...") | |
| try: | |
| create_repo( | |
| repo_id=self.repo_id, | |
| repo_type="dataset", | |
| token=self.token, | |
| private=True, | |
| exist_ok=True | |
| ) | |
| logger.info(f"Created repository {self.repo_id}") | |
| # Create initial README and metadata | |
| self._create_initial_files() | |
| return True | |
| except Exception as create_error: | |
| logger.error(f"Failed to create repository: {create_error}") | |
| return False | |
| def _create_initial_files(self): | |
| """Create initial repository files""" | |
| readme_content = """# Open WebUI Storage | |
| This dataset stores persistent data for Open WebUI deployment with automatic cleanup and versioning. | |
| ## Contents | |
| - `data-latest.tar.gz`: Latest data archive (symlink) | |
| - `data-YYYYMMDD-HHMMSS.tar.gz`: Timestamped data archives | |
| - `storage-metadata.json`: Metadata about stored archives | |
| - `README.md`: This file | |
| ## Features | |
| - Automatic cleanup of old backups | |
| - Change detection to avoid unnecessary uploads | |
| - Compression optimization | |
| - Storage usage monitoring | |
| This repository is automatically managed by the Open WebUI sync system. | |
| """ | |
| metadata = { | |
| "created": datetime.utcnow().isoformat(), | |
| "max_backups": self.max_backups, | |
| "archives": [], | |
| "total_size": 0 | |
| } | |
| # Upload README | |
| with tempfile.NamedTemporaryFile(mode='w', suffix='.md', delete=False) as tmp: | |
| tmp.write(readme_content) | |
| tmp.flush() | |
| self.api.upload_file( | |
| path_or_fileobj=tmp.name, | |
| path_in_repo="README.md", | |
| repo_id=self.repo_id, | |
| repo_type="dataset", | |
| commit_message="Initial repository setup", | |
| token=self.token | |
| ) | |
| os.unlink(tmp.name) | |
| # Upload metadata | |
| with tempfile.NamedTemporaryFile(mode='w', suffix='.json', delete=False) as tmp: | |
| json.dump(metadata, tmp, indent=2) | |
| tmp.flush() | |
| self.api.upload_file( | |
| path_or_fileobj=tmp.name, | |
| path_in_repo=self.metadata_file, | |
| repo_id=self.repo_id, | |
| repo_type="dataset", | |
| commit_message="Initial metadata", | |
| token=self.token | |
| ) | |
| os.unlink(tmp.name) | |
| def _get_metadata(self): | |
| """Download and parse metadata""" | |
| try: | |
| file_path = self.api.hf_hub_download( | |
| repo_id=self.repo_id, | |
| filename=self.metadata_file, | |
| repo_type="dataset", | |
| token=self.token | |
| ) | |
| with open(file_path, 'r') as f: | |
| return json.load(f) | |
| except Exception as e: | |
| logger.warning(f"Could not load metadata: {e}") | |
| return { | |
| "created": datetime.utcnow().isoformat(), | |
| "max_backups": self.max_backups, | |
| "archives": [], | |
| "total_size": 0 | |
| } | |
| def _update_metadata(self, metadata): | |
| """Upload updated metadata""" | |
| try: | |
| with tempfile.NamedTemporaryFile(mode='w', suffix='.json', delete=False) as tmp: | |
| json.dump(metadata, tmp, indent=2) | |
| tmp.flush() | |
| self.api.upload_file( | |
| path_or_fileobj=tmp.name, | |
| path_in_repo=self.metadata_file, | |
| repo_id=self.repo_id, | |
| repo_type="dataset", | |
| commit_message="Update metadata", | |
| token=self.token | |
| ) | |
| os.unlink(tmp.name) | |
| except Exception as e: | |
| logger.error(f"Failed to update metadata: {e}") | |
| def _cleanup_old_archives(self, metadata): | |
| """Remove old archives beyond max_backups limit""" | |
| if len(metadata["archives"]) <= self.max_backups: | |
| return metadata | |
| # Sort by timestamp, keep newest | |
| archives = sorted(metadata["archives"], key=lambda x: x["timestamp"], reverse=True) | |
| to_keep = archives[:self.max_backups] | |
| to_delete = archives[self.max_backups:] | |
| total_deleted_size = 0 | |
| for archive in to_delete: | |
| try: | |
| self.api.delete_file( | |
| path_in_repo=archive["filename"], | |
| repo_id=self.repo_id, | |
| repo_type="dataset", | |
| token=self.token | |
| ) | |
| total_deleted_size += archive["size"] | |
| logger.info(f"Deleted old archive: {archive['filename']} ({self._format_size(archive['size'])})") | |
| except Exception as e: | |
| logger.warning(f"Failed to delete {archive['filename']}: {e}") | |
| metadata["archives"] = to_keep | |
| metadata["total_size"] -= total_deleted_size | |
| if total_deleted_size > 0: | |
| logger.info(f"Cleaned up {self._format_size(total_deleted_size)} of storage") | |
| return metadata | |
| def download_data(self): | |
| """Download and extract latest data from HF dataset repo""" | |
| try: | |
| logger.info("Downloading data from Hugging Face...") | |
| # Ensure data directory exists and is writable | |
| self.data_dir.mkdir(parents=True, exist_ok=True) | |
| # Test write permissions | |
| test_file = self.data_dir / "test_write" | |
| try: | |
| test_file.touch() | |
| test_file.unlink() | |
| logger.info(f"Data directory {self.data_dir} is writable") | |
| except Exception as e: | |
| logger.warning(f"Data directory may not be writable: {e}") | |
| return | |
| if not self.token: | |
| logger.warning("No HF_TOKEN provided, skipping download") | |
| return | |
| # Ensure repository exists | |
| if not self.ensure_repo_exists(): | |
| logger.error("Could not access or create repository") | |
| return | |
| # Try to download the latest data archive | |
| try: | |
| metadata = self._get_metadata() | |
| current_files = list_repo_files(self.repo_id, repo_type="dataset", token=self.token) | |
| current_files = [ i for i in current_files if i.startswith("data-") ] | |
| current_files = sorted(current_files) | |
| logger.info(f"Found {len(current_files)} files to preserve.") | |
| archive_filenames = [i["filename"] for i in metadata["archives"]] | |
| logger.info(f"Archive filenames: {archive_filenames}") | |
| logger.info(f"Current filenames: {current_files}") | |
| latest_link = self.latest_link | |
| logger.info(f"Current latest_link: {latest_link}") | |
| if (latest_link not in current_files): | |
| if (len(current_files) > 0): | |
| latest_link = current_files[-1] | |
| logger.info(f"Latest link not found, falling back to last current file: {latest_link}") | |
| else: | |
| logger.error("No archives found in repository") | |
| return | |
| logger.info(f"Downloading latest data archive: {latest_link}") | |
| # First try the latest link | |
| file_path = self.api.hf_hub_download( | |
| repo_id=self.repo_id, | |
| filename=latest_link, | |
| repo_type="dataset", | |
| token=self.token | |
| ) | |
| with tarfile.open(file_path, 'r:gz') as tar: | |
| tar.extractall(self.data_dir) | |
| archive_size = self._get_archive_size(file_path) | |
| logger.info(f"Data extracted to {self.data_dir} ({self._format_size(archive_size)})") | |
| except Exception as e: | |
| logger.info(f"No existing data found (normal for first run): {e}") | |
| except Exception as e: | |
| logger.error(f"Error during download: {e}") | |
| def upload_data(self, force=False): | |
| """Compress and upload data to HF dataset repo with change detection""" | |
| try: | |
| if not self.token: | |
| logger.warning("No HF_TOKEN provided, skipping upload") | |
| return | |
| if not self.data_dir.exists() or not any(self.data_dir.iterdir()): | |
| logger.warning("No data to upload") | |
| return | |
| # Calculate current directory hash | |
| current_hash = self._get_directory_hash() | |
| # Get metadata to check for changes | |
| metadata = self._get_metadata() | |
| last_hash = metadata.get("last_hash") | |
| if not force and current_hash == last_hash: | |
| logger.info("No changes detected, skipping upload") | |
| return | |
| logger.info("Changes detected, preparing upload...") | |
| # Ensure repository exists | |
| if not self.ensure_repo_exists(): | |
| logger.error("Could not access or create repository") | |
| return | |
| # Create timestamped filename | |
| timestamp = datetime.utcnow().strftime("%Y%m%d-%H%M%S") | |
| archive_filename = self.archive_pattern.format(timestamp=timestamp) | |
| # Create temporary archive with optimized compression | |
| with tempfile.NamedTemporaryFile(suffix='.tar.gz', delete=False) as tmp: | |
| with tarfile.open(tmp.name, f'w:gz', compresslevel=self.compression_level) as tar: | |
| total_files = 0 | |
| for item in self.data_dir.iterdir(): | |
| if item.name not in ["test_write", ".gitkeep"]: | |
| tar.add(item, arcname=item.name) | |
| if item.is_file(): | |
| total_files += 1 | |
| elif item.is_dir(): | |
| total_files += sum(1 for _f in item.rglob('*') if _f.is_file()) | |
| archive_size = self._get_archive_size(tmp.name) | |
| logger.info(f"Created archive: {self._format_size(archive_size)}, {total_files} files") | |
| # Upload timestamped archive | |
| self.api.upload_file( | |
| path_or_fileobj=tmp.name, | |
| path_in_repo=archive_filename, | |
| repo_id=self.repo_id, | |
| repo_type="dataset", | |
| commit_message=f"Update Open WebUI data - {timestamp}", | |
| token=self.token | |
| ) | |
| # Upload as latest (overwrite) | |
| self.api.upload_file( | |
| path_or_fileobj=tmp.name, | |
| path_in_repo=self.latest_link, | |
| repo_id=self.repo_id, | |
| repo_type="dataset", | |
| commit_message=f"Update latest data - {timestamp}", | |
| token=self.token | |
| ) | |
| # Clean up temp file | |
| os.unlink(tmp.name) | |
| # Update metadata | |
| archive_info = { | |
| "filename": archive_filename, | |
| "timestamp": timestamp, | |
| "size": archive_size, | |
| "hash": current_hash, | |
| "files_count": total_files | |
| } | |
| metadata["archives"].append(archive_info) | |
| metadata["total_size"] += archive_size | |
| metadata["last_hash"] = current_hash | |
| metadata["last_upload"] = datetime.utcnow().isoformat() | |
| # Cleanup old archives | |
| metadata = self._cleanup_old_archives(metadata) | |
| # Update metadata | |
| self._update_metadata(metadata) | |
| logger.info(f"Upload successful: {archive_filename} ({self._format_size(archive_size)})") | |
| logger.info(f"Total storage used: {self._format_size(metadata['total_size'])}") | |
| # Remove old commit history | |
| self.prune_repo_history() | |
| except Exception as e: | |
| logger.error(f"Error uploading data: {e}") | |
| def list_archives(self): | |
| """List all available archives""" | |
| try: | |
| metadata = self._get_metadata() | |
| if not metadata["archives"]: | |
| logger.info("No archives found") | |
| return | |
| logger.info("Available archives:") | |
| logger.info("-" * 60) | |
| total_size = 0 | |
| for archive in sorted(metadata["archives"], key=lambda x: x["timestamp"], reverse=True): | |
| size_str = self._format_size(archive["size"]) | |
| files_count = archive.get("files_count", "unknown") | |
| logger.info(f"{archive['filename']:<30} {size_str:>10} {files_count:>6} files") | |
| total_size += archive["size"] | |
| logger.info("-" * 60) | |
| logger.info(f"Total: {len(metadata['archives'])} archives, {self._format_size(total_size)}") | |
| except Exception as e: | |
| logger.error(f"Error listing archives: {e}") | |
| def cleanup_storage(self): | |
| """Force cleanup of old archives""" | |
| try: | |
| metadata = self._get_metadata() | |
| old_size = metadata["total_size"] | |
| metadata = self._cleanup_old_archives(metadata) | |
| self._update_metadata(metadata) | |
| saved = old_size - metadata["total_size"] | |
| if saved > 0: | |
| logger.info(f"Cleanup completed. Saved {self._format_size(saved)} of storage") | |
| else: | |
| logger.info("No cleanup needed") | |
| except Exception as e: | |
| logger.error(f"Error during cleanup: {e}") | |
| # <<< NEW FUNCTION START >>> | |
| def prune_repo_history(self): | |
| """ | |
| DANGEROUS: Deletes and recreates the repo to purge git history and reduce storage. | |
| This will erase all old versions, keeping only the current files. | |
| """ | |
| logger.warning("="*60) | |
| logger.warning("!!! DESTRUCTIVE OPERATION INITIATED: PRUNE REPO HISTORY !!!") | |
| logger.warning(f"This will permanently delete the repository '{self.repo_id}' and its entire Git history.") | |
| logger.warning("Only the most recent versions of files will be preserved.") | |
| logger.warning("="*60) | |
| try: | |
| # 1. Get the list of current files to preserve | |
| logger.info("Step 1/5: Listing current files in the repository...") | |
| current_files = list_repo_files(self.repo_id, repo_type="dataset", token=self.token) | |
| if not current_files: | |
| logger.warning("Repository is empty or inaccessible. Nothing to prune.") | |
| return | |
| metadata = self._get_metadata() | |
| logger.info(f"Found {len(current_files)} files to preserve.") | |
| archive_filenames = [i["filename"] for i in metadata["archives"]] | |
| archive_filenames.append(self.latest_link) # Also preserve latest link | |
| logger.info(f"Archive filenames: {archive_filenames}") | |
| # 2. Download all current files to a temporary directory | |
| with tempfile.TemporaryDirectory() as tmpdir: | |
| logger.info(f"Step 2/5: Downloading current files to a temporary location...") | |
| for file_path in current_files: | |
| if file_path.startswith("data-") and (file_path not in archive_filenames): | |
| logger.info(f"Skipping {file_path} (not in archive)") | |
| continue | |
| self.api.hf_hub_download( | |
| repo_id=self.repo_id, | |
| filename=file_path, | |
| repo_type="dataset", | |
| token=self.token, | |
| local_dir=tmpdir, | |
| local_dir_use_symlinks=False | |
| ) | |
| # 3. Delete the entire repository | |
| logger.warning(f"Step 3/5: Deleting repository '{self.repo_id}'...") | |
| delete_repo(self.repo_id, repo_type="dataset", token=self.token) | |
| logger.info("Repository deleted successfully.") | |
| # 4. Re-create the repository (now empty with no history) | |
| logger.info(f"Step 4/5: Re-creating repository '{self.repo_id}'...") | |
| self.ensure_repo_exists() # This will create it and initial files | |
| logger.info("Repository re-created successfully.") | |
| # 5. Upload the preserved files back to the new repository | |
| logger.info("Step 5/5: Uploading preserved files to the new repository...") | |
| self.api.upload_folder( | |
| folder_path=tmpdir, | |
| repo_id=self.repo_id, | |
| repo_type="dataset", | |
| token=self.token, | |
| commit_message="Repo history pruned, restoring current files" | |
| ) | |
| logger.info("="*60) | |
| logger.info("Repository history prune complete. Storage usage has been reset.") | |
| logger.info("="*60) | |
| except Exception as e: | |
| logger.error(f"An error occurred during the prune operation: {e}") | |
| logger.error("The repository may be in an inconsistent state. Please check the Hugging Face Hub.") | |
| # <<< NEW FUNCTION END >>> | |
| def main(): | |
| import sys | |
| repo_id = os.getenv("HF_STORAGE_REPO", "nxdev-org/open-webui-storage") | |
| token = os.getenv("HF_TOKEN") | |
| data_dir = os.getenv("DATA_DIR", "/tmp/open-webui-data") | |
| max_backups = int(os.getenv("MAX_BACKUPS", "3")) | |
| sync = HFStorageSync(repo_id, token, data_dir, max_backups=max_backups) | |
| if len(sys.argv) > 1: | |
| command = sys.argv[1].lower() | |
| force = "--force" in sys.argv | |
| if command == "download": | |
| sync.download_data() | |
| elif command == "upload": | |
| sync.upload_data(force=force) | |
| elif command == "list": | |
| sync.list_archives() | |
| elif command == "cleanup": | |
| sync.cleanup_storage() | |
| # <<< NEW COMMAND HANDLING START >>> | |
| elif command == "prune-history": | |
| if not force: | |
| print("ERROR: This is a destructive operation that will delete all backup history.") | |
| print("Please use 'prune-history --force' to confirm you want to proceed.") | |
| sys.exit(1) | |
| sync.prune_repo_history() | |
| # <<< NEW COMMAND HANDLING END >>> | |
| else: | |
| print(f"Unknown command: {command}") | |
| print_usage() | |
| else: | |
| print_usage() | |
| def print_usage(): | |
| print("Usage: sync_storage.py [command]") | |
| print("\nCommands:") | |
| print(" download Download and extract latest data") | |
| print(" upload [--force] Upload data (use --force to ignore change detection)") | |
| print(" list List all available archives") | |
| print(" cleanup Force cleanup of old archives based on MAX_BACKUPS") | |
| print(" prune-history --force DANGEROUS: Deletes and recreates the repo to clear all git history.") | |
| print("\nEnvironment variables:") | |
| print(" HF_STORAGE_REPO Hugging Face repository ID") | |
| print(" HF_TOKEN Hugging Face API token") | |
| print(" DATA_DIR Local data directory") | |
| print(" MAX_BACKUPS Maximum number of backups to keep (default: 3)") | |
| if __name__ == "__main__": | |
| main() |