#!/usr/bin/env python3 import os import shutil import json import hashlib from pathlib import Path from datetime import datetime from huggingface_hub import HfApi, create_repo, list_repo_files, delete_repo import tarfile import tempfile import logging # Set up logging logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') logger = logging.getLogger(__name__) class HFStorageSync: def __init__(self, repo_id, token=None, data_dir="/tmp/open-webui-data", max_backups=3, compression_level=6): self.repo_id = repo_id self.data_dir = Path(data_dir) self.token = token self.max_backups = max_backups self.compression_level = compression_level # Initialize API with token directly self.api = HfApi(token=token) if token else HfApi() # File patterns for better organization self.archive_pattern = "data-{timestamp}.tar.gz" self.latest_link = "data-latest.tar.gz" self.metadata_file = "storage-metadata.json" def _get_directory_hash(self): """Calculate hash of directory contents for change detection""" hasher = hashlib.sha256() if not self.data_dir.exists(): return hasher.hexdigest() for item in sorted(self.data_dir.rglob('*')): if item.is_file() and item.name not in [".gitkeep", "test_write"]: hasher.update(str(item.relative_to(self.data_dir)).encode()) hasher.update(str(item.stat().st_mtime).encode()) hasher.update(str(item.stat().st_size).encode()) return hasher.hexdigest() def _get_archive_size(self, archive_path): """Get the size of an archive file""" try: return os.path.getsize(archive_path) except: return 0 def _format_size(self, size_bytes): """Format file size in human readable format""" for unit in ['B', 'KB', 'MB', 'GB']: if size_bytes < 1024.0: return f"{size_bytes:.1f} {unit}" size_bytes /= 1024.0 return f"{size_bytes:.1f} TB" def ensure_repo_exists(self): """Create repository if it doesn't exist""" if not self.token: logger.warning("No token provided, cannot create repository") return False try: # Check if repo exists repo_info = self.api.repo_info(repo_id=self.repo_id, repo_type="dataset") logger.info(f"Repository {self.repo_id} exists") return True except Exception as e: logger.info(f"Repository {self.repo_id} not found, attempting to create...") try: create_repo( repo_id=self.repo_id, repo_type="dataset", token=self.token, private=True, exist_ok=True ) logger.info(f"Created repository {self.repo_id}") # Create initial README and metadata self._create_initial_files() return True except Exception as create_error: logger.error(f"Failed to create repository: {create_error}") return False def _create_initial_files(self): """Create initial repository files""" readme_content = """# Open WebUI Storage This dataset stores persistent data for Open WebUI deployment with automatic cleanup and versioning. ## Contents - `data-latest.tar.gz`: Latest data archive (symlink) - `data-YYYYMMDD-HHMMSS.tar.gz`: Timestamped data archives - `storage-metadata.json`: Metadata about stored archives - `README.md`: This file ## Features - Automatic cleanup of old backups - Change detection to avoid unnecessary uploads - Compression optimization - Storage usage monitoring This repository is automatically managed by the Open WebUI sync system. """ metadata = { "created": datetime.utcnow().isoformat(), "max_backups": self.max_backups, "archives": [], "total_size": 0 } # Upload README with tempfile.NamedTemporaryFile(mode='w', suffix='.md', delete=False) as tmp: tmp.write(readme_content) tmp.flush() self.api.upload_file( path_or_fileobj=tmp.name, path_in_repo="README.md", repo_id=self.repo_id, repo_type="dataset", commit_message="Initial repository setup", token=self.token ) os.unlink(tmp.name) # Upload metadata with tempfile.NamedTemporaryFile(mode='w', suffix='.json', delete=False) as tmp: json.dump(metadata, tmp, indent=2) tmp.flush() self.api.upload_file( path_or_fileobj=tmp.name, path_in_repo=self.metadata_file, repo_id=self.repo_id, repo_type="dataset", commit_message="Initial metadata", token=self.token ) os.unlink(tmp.name) def _get_metadata(self): """Download and parse metadata""" try: file_path = self.api.hf_hub_download( repo_id=self.repo_id, filename=self.metadata_file, repo_type="dataset", token=self.token ) with open(file_path, 'r') as f: return json.load(f) except Exception as e: logger.warning(f"Could not load metadata: {e}") return { "created": datetime.utcnow().isoformat(), "max_backups": self.max_backups, "archives": [], "total_size": 0 } def _update_metadata(self, metadata): """Upload updated metadata""" try: with tempfile.NamedTemporaryFile(mode='w', suffix='.json', delete=False) as tmp: json.dump(metadata, tmp, indent=2) tmp.flush() self.api.upload_file( path_or_fileobj=tmp.name, path_in_repo=self.metadata_file, repo_id=self.repo_id, repo_type="dataset", commit_message="Update metadata", token=self.token ) os.unlink(tmp.name) except Exception as e: logger.error(f"Failed to update metadata: {e}") def _cleanup_old_archives(self, metadata): """Remove old archives beyond max_backups limit""" if len(metadata["archives"]) <= self.max_backups: return metadata # Sort by timestamp, keep newest archives = sorted(metadata["archives"], key=lambda x: x["timestamp"], reverse=True) to_keep = archives[:self.max_backups] to_delete = archives[self.max_backups:] total_deleted_size = 0 for archive in to_delete: try: self.api.delete_file( path_in_repo=archive["filename"], repo_id=self.repo_id, repo_type="dataset", token=self.token ) total_deleted_size += archive["size"] logger.info(f"Deleted old archive: {archive['filename']} ({self._format_size(archive['size'])})") except Exception as e: logger.warning(f"Failed to delete {archive['filename']}: {e}") metadata["archives"] = to_keep metadata["total_size"] -= total_deleted_size if total_deleted_size > 0: logger.info(f"Cleaned up {self._format_size(total_deleted_size)} of storage") return metadata def download_data(self): """Download and extract latest data from HF dataset repo""" try: logger.info("Downloading data from Hugging Face...") # Ensure data directory exists and is writable self.data_dir.mkdir(parents=True, exist_ok=True) # Test write permissions test_file = self.data_dir / "test_write" try: test_file.touch() test_file.unlink() logger.info(f"Data directory {self.data_dir} is writable") except Exception as e: logger.warning(f"Data directory may not be writable: {e}") return if not self.token: logger.warning("No HF_TOKEN provided, skipping download") return # Ensure repository exists if not self.ensure_repo_exists(): logger.error("Could not access or create repository") return # Try to download the latest data archive try: metadata = self._get_metadata() current_files = list_repo_files(self.repo_id, repo_type="dataset", token=self.token) current_files = [ i for i in current_files if i.startswith("data-") ] current_files = sorted(current_files) logger.info(f"Found {len(current_files)} files to preserve.") archive_filenames = [i["filename"] for i in metadata["archives"]] logger.info(f"Archive filenames: {archive_filenames}") logger.info(f"Current filenames: {current_files}") latest_link = self.latest_link logger.info(f"Current latest_link: {latest_link}") if (latest_link not in current_files): if (len(current_files) > 0): latest_link = current_files[-1] logger.info(f"Latest link not found, falling back to last current file: {latest_link}") else: logger.error("No archives found in repository") return logger.info(f"Downloading latest data archive: {latest_link}") # First try the latest link file_path = self.api.hf_hub_download( repo_id=self.repo_id, filename=latest_link, repo_type="dataset", token=self.token ) with tarfile.open(file_path, 'r:gz') as tar: tar.extractall(self.data_dir) archive_size = self._get_archive_size(file_path) logger.info(f"Data extracted to {self.data_dir} ({self._format_size(archive_size)})") except Exception as e: logger.info(f"No existing data found (normal for first run): {e}") except Exception as e: logger.error(f"Error during download: {e}") def upload_data(self, force=False): """Compress and upload data to HF dataset repo with change detection""" try: if not self.token: logger.warning("No HF_TOKEN provided, skipping upload") return if not self.data_dir.exists() or not any(self.data_dir.iterdir()): logger.warning("No data to upload") return # Calculate current directory hash current_hash = self._get_directory_hash() # Get metadata to check for changes metadata = self._get_metadata() last_hash = metadata.get("last_hash") if not force and current_hash == last_hash: logger.info("No changes detected, skipping upload") return logger.info("Changes detected, preparing upload...") # Ensure repository exists if not self.ensure_repo_exists(): logger.error("Could not access or create repository") return # Create timestamped filename timestamp = datetime.utcnow().strftime("%Y%m%d-%H%M%S") archive_filename = self.archive_pattern.format(timestamp=timestamp) # Create temporary archive with optimized compression with tempfile.NamedTemporaryFile(suffix='.tar.gz', delete=False) as tmp: with tarfile.open(tmp.name, f'w:gz', compresslevel=self.compression_level) as tar: total_files = 0 for item in self.data_dir.iterdir(): if item.name not in ["test_write", ".gitkeep"]: tar.add(item, arcname=item.name) if item.is_file(): total_files += 1 elif item.is_dir(): total_files += sum(1 for _f in item.rglob('*') if _f.is_file()) archive_size = self._get_archive_size(tmp.name) logger.info(f"Created archive: {self._format_size(archive_size)}, {total_files} files") # Upload timestamped archive self.api.upload_file( path_or_fileobj=tmp.name, path_in_repo=archive_filename, repo_id=self.repo_id, repo_type="dataset", commit_message=f"Update Open WebUI data - {timestamp}", token=self.token ) # Upload as latest (overwrite) self.api.upload_file( path_or_fileobj=tmp.name, path_in_repo=self.latest_link, repo_id=self.repo_id, repo_type="dataset", commit_message=f"Update latest data - {timestamp}", token=self.token ) # Clean up temp file os.unlink(tmp.name) # Update metadata archive_info = { "filename": archive_filename, "timestamp": timestamp, "size": archive_size, "hash": current_hash, "files_count": total_files } metadata["archives"].append(archive_info) metadata["total_size"] += archive_size metadata["last_hash"] = current_hash metadata["last_upload"] = datetime.utcnow().isoformat() # Cleanup old archives metadata = self._cleanup_old_archives(metadata) # Update metadata self._update_metadata(metadata) logger.info(f"Upload successful: {archive_filename} ({self._format_size(archive_size)})") logger.info(f"Total storage used: {self._format_size(metadata['total_size'])}") # Remove old commit history self.prune_repo_history() except Exception as e: logger.error(f"Error uploading data: {e}") def list_archives(self): """List all available archives""" try: metadata = self._get_metadata() if not metadata["archives"]: logger.info("No archives found") return logger.info("Available archives:") logger.info("-" * 60) total_size = 0 for archive in sorted(metadata["archives"], key=lambda x: x["timestamp"], reverse=True): size_str = self._format_size(archive["size"]) files_count = archive.get("files_count", "unknown") logger.info(f"{archive['filename']:<30} {size_str:>10} {files_count:>6} files") total_size += archive["size"] logger.info("-" * 60) logger.info(f"Total: {len(metadata['archives'])} archives, {self._format_size(total_size)}") except Exception as e: logger.error(f"Error listing archives: {e}") def cleanup_storage(self): """Force cleanup of old archives""" try: metadata = self._get_metadata() old_size = metadata["total_size"] metadata = self._cleanup_old_archives(metadata) self._update_metadata(metadata) saved = old_size - metadata["total_size"] if saved > 0: logger.info(f"Cleanup completed. Saved {self._format_size(saved)} of storage") else: logger.info("No cleanup needed") except Exception as e: logger.error(f"Error during cleanup: {e}") # <<< NEW FUNCTION START >>> def prune_repo_history(self): """ DANGEROUS: Deletes and recreates the repo to purge git history and reduce storage. This will erase all old versions, keeping only the current files. """ logger.warning("="*60) logger.warning("!!! DESTRUCTIVE OPERATION INITIATED: PRUNE REPO HISTORY !!!") logger.warning(f"This will permanently delete the repository '{self.repo_id}' and its entire Git history.") logger.warning("Only the most recent versions of files will be preserved.") logger.warning("="*60) try: # 1. Get the list of current files to preserve logger.info("Step 1/5: Listing current files in the repository...") current_files = list_repo_files(self.repo_id, repo_type="dataset", token=self.token) if not current_files: logger.warning("Repository is empty or inaccessible. Nothing to prune.") return metadata = self._get_metadata() logger.info(f"Found {len(current_files)} files to preserve.") archive_filenames = [i["filename"] for i in metadata["archives"]] archive_filenames.append(self.latest_link) # Also preserve latest link logger.info(f"Archive filenames: {archive_filenames}") # 2. Download all current files to a temporary directory with tempfile.TemporaryDirectory() as tmpdir: logger.info(f"Step 2/5: Downloading current files to a temporary location...") for file_path in current_files: if file_path.startswith("data-") and (file_path not in archive_filenames): logger.info(f"Skipping {file_path} (not in archive)") continue self.api.hf_hub_download( repo_id=self.repo_id, filename=file_path, repo_type="dataset", token=self.token, local_dir=tmpdir, local_dir_use_symlinks=False ) # 3. Delete the entire repository logger.warning(f"Step 3/5: Deleting repository '{self.repo_id}'...") delete_repo(self.repo_id, repo_type="dataset", token=self.token) logger.info("Repository deleted successfully.") # 4. Re-create the repository (now empty with no history) logger.info(f"Step 4/5: Re-creating repository '{self.repo_id}'...") self.ensure_repo_exists() # This will create it and initial files logger.info("Repository re-created successfully.") # 5. Upload the preserved files back to the new repository logger.info("Step 5/5: Uploading preserved files to the new repository...") self.api.upload_folder( folder_path=tmpdir, repo_id=self.repo_id, repo_type="dataset", token=self.token, commit_message="Repo history pruned, restoring current files" ) logger.info("="*60) logger.info("Repository history prune complete. Storage usage has been reset.") logger.info("="*60) except Exception as e: logger.error(f"An error occurred during the prune operation: {e}") logger.error("The repository may be in an inconsistent state. Please check the Hugging Face Hub.") # <<< NEW FUNCTION END >>> def main(): import sys repo_id = os.getenv("HF_STORAGE_REPO", "nxdev-org/open-webui-storage") token = os.getenv("HF_TOKEN") data_dir = os.getenv("DATA_DIR", "/tmp/open-webui-data") max_backups = int(os.getenv("MAX_BACKUPS", "3")) sync = HFStorageSync(repo_id, token, data_dir, max_backups=max_backups) if len(sys.argv) > 1: command = sys.argv[1].lower() force = "--force" in sys.argv if command == "download": sync.download_data() elif command == "upload": sync.upload_data(force=force) elif command == "list": sync.list_archives() elif command == "cleanup": sync.cleanup_storage() # <<< NEW COMMAND HANDLING START >>> elif command == "prune-history": if not force: print("ERROR: This is a destructive operation that will delete all backup history.") print("Please use 'prune-history --force' to confirm you want to proceed.") sys.exit(1) sync.prune_repo_history() # <<< NEW COMMAND HANDLING END >>> else: print(f"Unknown command: {command}") print_usage() else: print_usage() def print_usage(): print("Usage: sync_storage.py [command]") print("\nCommands:") print(" download Download and extract latest data") print(" upload [--force] Upload data (use --force to ignore change detection)") print(" list List all available archives") print(" cleanup Force cleanup of old archives based on MAX_BACKUPS") print(" prune-history --force DANGEROUS: Deletes and recreates the repo to clear all git history.") print("\nEnvironment variables:") print(" HF_STORAGE_REPO Hugging Face repository ID") print(" HF_TOKEN Hugging Face API token") print(" DATA_DIR Local data directory") print(" MAX_BACKUPS Maximum number of backups to keep (default: 3)") if __name__ == "__main__": main()