Spaces:
Paused
Paused
| #!/usr/bin/env python3 | |
| import os | |
| import shutil | |
| import json | |
| import hashlib | |
| from pathlib import Path | |
| from datetime import datetime | |
| from huggingface_hub import HfApi, create_repo, list_repo_files | |
| import tarfile | |
| import tempfile | |
| import logging | |
| # Set up logging | |
| logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') | |
| logger = logging.getLogger(__name__) | |
| class HFStorageSync: | |
| def __init__(self, repo_id, token=None, data_dir="/tmp/open-webui-data", | |
| max_backups=3, compression_level=6): | |
| self.repo_id = repo_id | |
| self.data_dir = Path(data_dir) | |
| self.token = token | |
| self.max_backups = max_backups | |
| self.compression_level = compression_level | |
| # Initialize API with token directly | |
| self.api = HfApi(token=token) if token else HfApi() | |
| # File patterns for better organization | |
| self.archive_pattern = "data-{timestamp}.tar.gz" | |
| self.latest_link = "data-latest.tar.gz" | |
| self.metadata_file = "storage-metadata.json" | |
| def _get_directory_hash(self): | |
| """Calculate hash of directory contents for change detection""" | |
| hasher = hashlib.sha256() | |
| if not self.data_dir.exists(): | |
| return hasher.hexdigest() | |
| for item in sorted(self.data_dir.rglob('*')): | |
| if item.is_file() and item.name not in [".gitkeep", "test_write"]: | |
| hasher.update(str(item.relative_to(self.data_dir)).encode()) | |
| hasher.update(str(item.stat().st_mtime).encode()) | |
| hasher.update(str(item.stat().st_size).encode()) | |
| return hasher.hexdigest() | |
| def _get_archive_size(self, archive_path): | |
| """Get the size of an archive file""" | |
| try: | |
| return os.path.getsize(archive_path) | |
| except: | |
| return 0 | |
| def _format_size(self, size_bytes): | |
| """Format file size in human readable format""" | |
| for unit in ['B', 'KB', 'MB', 'GB']: | |
| if size_bytes < 1024.0: | |
| return f"{size_bytes:.1f} {unit}" | |
| size_bytes /= 1024.0 | |
| return f"{size_bytes:.1f} TB" | |
| def ensure_repo_exists(self): | |
| """Create repository if it doesn't exist""" | |
| if not self.token: | |
| logger.warning("No token provided, cannot create repository") | |
| return False | |
| try: | |
| # Check if repo exists | |
| repo_info = self.api.repo_info(repo_id=self.repo_id, repo_type="dataset") | |
| logger.info(f"Repository {self.repo_id} exists") | |
| return True | |
| except Exception as e: | |
| logger.info(f"Repository {self.repo_id} not found, attempting to create...") | |
| try: | |
| create_repo( | |
| repo_id=self.repo_id, | |
| repo_type="dataset", | |
| token=self.token, | |
| private=True, | |
| exist_ok=True | |
| ) | |
| logger.info(f"Created repository {self.repo_id}") | |
| # Create initial README and metadata | |
| self._create_initial_files() | |
| return True | |
| except Exception as create_error: | |
| logger.error(f"Failed to create repository: {create_error}") | |
| return False | |
| def _create_initial_files(self): | |
| """Create initial repository files""" | |
| readme_content = """# Open WebUI Storage | |
| This dataset stores persistent data for Open WebUI deployment with automatic cleanup and versioning. | |
| ## Contents | |
| - `data-latest.tar.gz`: Latest data archive (symlink) | |
| - `data-YYYYMMDD-HHMMSS.tar.gz`: Timestamped data archives | |
| - `storage-metadata.json`: Metadata about stored archives | |
| - `README.md`: This file | |
| ## Features | |
| - Automatic cleanup of old backups | |
| - Change detection to avoid unnecessary uploads | |
| - Compression optimization | |
| - Storage usage monitoring | |
| This repository is automatically managed by the Open WebUI sync system. | |
| """ | |
| metadata = { | |
| "created": datetime.utcnow().isoformat(), | |
| "max_backups": self.max_backups, | |
| "archives": [], | |
| "total_size": 0 | |
| } | |
| # Upload README | |
| with tempfile.NamedTemporaryFile(mode='w', suffix='.md', delete=False) as tmp: | |
| tmp.write(readme_content) | |
| tmp.flush() | |
| self.api.upload_file( | |
| path_or_fileobj=tmp.name, | |
| path_in_repo="README.md", | |
| repo_id=self.repo_id, | |
| repo_type="dataset", | |
| commit_message="Initial repository setup", | |
| token=self.token | |
| ) | |
| os.unlink(tmp.name) | |
| # Upload metadata | |
| with tempfile.NamedTemporaryFile(mode='w', suffix='.json', delete=False) as tmp: | |
| json.dump(metadata, tmp, indent=2) | |
| tmp.flush() | |
| self.api.upload_file( | |
| path_or_fileobj=tmp.name, | |
| path_in_repo=self.metadata_file, | |
| repo_id=self.repo_id, | |
| repo_type="dataset", | |
| commit_message="Initial metadata", | |
| token=self.token | |
| ) | |
| os.unlink(tmp.name) | |
| def _get_metadata(self): | |
| """Download and parse metadata""" | |
| try: | |
| file_path = self.api.hf_hub_download( | |
| repo_id=self.repo_id, | |
| filename=self.metadata_file, | |
| repo_type="dataset", | |
| token=self.token | |
| ) | |
| with open(file_path, 'r') as f: | |
| return json.load(f) | |
| except Exception as e: | |
| logger.warning(f"Could not load metadata: {e}") | |
| return { | |
| "created": datetime.utcnow().isoformat(), | |
| "max_backups": self.max_backups, | |
| "archives": [], | |
| "total_size": 0 | |
| } | |
| def _update_metadata(self, metadata): | |
| """Upload updated metadata""" | |
| try: | |
| with tempfile.NamedTemporaryFile(mode='w', suffix='.json', delete=False) as tmp: | |
| json.dump(metadata, tmp, indent=2) | |
| tmp.flush() | |
| self.api.upload_file( | |
| path_or_fileobj=tmp.name, | |
| path_in_repo=self.metadata_file, | |
| repo_id=self.repo_id, | |
| repo_type="dataset", | |
| commit_message="Update metadata", | |
| token=self.token | |
| ) | |
| os.unlink(tmp.name) | |
| except Exception as e: | |
| logger.error(f"Failed to update metadata: {e}") | |
| def _cleanup_old_archives(self, metadata): | |
| """Remove old archives beyond max_backups limit""" | |
| if len(metadata["archives"]) <= self.max_backups: | |
| return metadata | |
| # Sort by timestamp, keep newest | |
| archives = sorted(metadata["archives"], key=lambda x: x["timestamp"], reverse=True) | |
| to_keep = archives[:self.max_backups] | |
| to_delete = archives[self.max_backups:] | |
| total_deleted_size = 0 | |
| for archive in to_delete: | |
| try: | |
| self.api.delete_file( | |
| path_in_repo=archive["filename"], | |
| repo_id=self.repo_id, | |
| repo_type="dataset", | |
| token=self.token | |
| ) | |
| total_deleted_size += archive["size"] | |
| logger.info(f"Deleted old archive: {archive['filename']} ({self._format_size(archive['size'])})") | |
| except Exception as e: | |
| logger.warning(f"Failed to delete {archive['filename']}: {e}") | |
| metadata["archives"] = to_keep | |
| metadata["total_size"] -= total_deleted_size | |
| if total_deleted_size > 0: | |
| logger.info(f"Cleaned up {self._format_size(total_deleted_size)} of storage") | |
| return metadata | |
| def download_data(self): | |
| """Download and extract latest data from HF dataset repo""" | |
| try: | |
| logger.info("Downloading data from Hugging Face...") | |
| # Ensure data directory exists and is writable | |
| self.data_dir.mkdir(parents=True, exist_ok=True) | |
| # Test write permissions | |
| test_file = self.data_dir / "test_write" | |
| try: | |
| test_file.touch() | |
| test_file.unlink() | |
| logger.info(f"Data directory {self.data_dir} is writable") | |
| except Exception as e: | |
| logger.warning(f"Data directory may not be writable: {e}") | |
| return | |
| if not self.token: | |
| logger.warning("No HF_TOKEN provided, skipping download") | |
| return | |
| # Ensure repository exists | |
| if not self.ensure_repo_exists(): | |
| logger.error("Could not access or create repository") | |
| return | |
| # Try to download the latest data archive | |
| try: | |
| # First try the latest link | |
| file_path = self.api.hf_hub_download( | |
| repo_id=self.repo_id, | |
| filename=self.latest_link, | |
| repo_type="dataset", | |
| token=self.token | |
| ) | |
| with tarfile.open(file_path, 'r:gz') as tar: | |
| tar.extractall(self.data_dir) | |
| archive_size = self._get_archive_size(file_path) | |
| logger.info(f"Data extracted to {self.data_dir} ({self._format_size(archive_size)})") | |
| except Exception as e: | |
| logger.info(f"No existing data found (normal for first run): {e}") | |
| except Exception as e: | |
| logger.error(f"Error during download: {e}") | |
| def upload_data(self, force=False): | |
| """Compress and upload data to HF dataset repo with change detection""" | |
| try: | |
| if not self.token: | |
| logger.warning("No HF_TOKEN provided, skipping upload") | |
| return | |
| if not self.data_dir.exists() or not any(self.data_dir.iterdir()): | |
| logger.warning("No data to upload") | |
| return | |
| # Calculate current directory hash | |
| current_hash = self._get_directory_hash() | |
| # Get metadata to check for changes | |
| metadata = self._get_metadata() | |
| last_hash = metadata.get("last_hash") | |
| if not force and current_hash == last_hash: | |
| logger.info("No changes detected, skipping upload") | |
| return | |
| logger.info("Changes detected, preparing upload...") | |
| # Ensure repository exists | |
| if not self.ensure_repo_exists(): | |
| logger.error("Could not access or create repository") | |
| return | |
| # Create timestamped filename | |
| timestamp = datetime.utcnow().strftime("%Y%m%d-%H%M%S") | |
| archive_filename = self.archive_pattern.format(timestamp=timestamp) | |
| # Create temporary archive with optimized compression | |
| with tempfile.NamedTemporaryFile(suffix='.tar.gz', delete=False) as tmp: | |
| with tarfile.open(tmp.name, f'w:gz', compresslevel=self.compression_level) as tar: | |
| total_files = 0 | |
| for item in self.data_dir.iterdir(): | |
| if item.name not in ["test_write", ".gitkeep"]: | |
| tar.add(item, arcname=item.name) | |
| if item.is_file(): | |
| total_files += 1 | |
| elif item.is_dir(): | |
| total_files += sum(1 for _ in item.rglob('*') if _.is_file()) | |
| archive_size = self._get_archive_size(tmp.name) | |
| logger.info(f"Created archive: {self._format_size(archive_size)}, {total_files} files") | |
| # Upload timestamped archive | |
| self.api.upload_file( | |
| path_or_fileobj=tmp.name, | |
| path_in_repo=archive_filename, | |
| repo_id=self.repo_id, | |
| repo_type="dataset", | |
| commit_message=f"Update Open WebUI data - {timestamp}", | |
| token=self.token | |
| ) | |
| # Upload as latest (overwrite) | |
| self.api.upload_file( | |
| path_or_fileobj=tmp.name, | |
| path_in_repo=self.latest_link, | |
| repo_id=self.repo_id, | |
| repo_type="dataset", | |
| commit_message=f"Update latest data - {timestamp}", | |
| token=self.token | |
| ) | |
| # Clean up temp file | |
| os.unlink(tmp.name) | |
| # Update metadata | |
| archive_info = { | |
| "filename": archive_filename, | |
| "timestamp": timestamp, | |
| "size": archive_size, | |
| "hash": current_hash, | |
| "files_count": total_files | |
| } | |
| metadata["archives"].append(archive_info) | |
| metadata["total_size"] += archive_size | |
| metadata["last_hash"] = current_hash | |
| metadata["last_upload"] = datetime.utcnow().isoformat() | |
| # Cleanup old archives | |
| metadata = self._cleanup_old_archives(metadata) | |
| # Update metadata | |
| self._update_metadata(metadata) | |
| logger.info(f"Upload successful: {archive_filename} ({self._format_size(archive_size)})") | |
| logger.info(f"Total storage used: {self._format_size(metadata['total_size'])}") | |
| except Exception as e: | |
| logger.error(f"Error uploading data: {e}") | |
| def list_archives(self): | |
| """List all available archives""" | |
| try: | |
| metadata = self._get_metadata() | |
| if not metadata["archives"]: | |
| logger.info("No archives found") | |
| return | |
| logger.info("Available archives:") | |
| logger.info("-" * 60) | |
| total_size = 0 | |
| for archive in sorted(metadata["archives"], key=lambda x: x["timestamp"], reverse=True): | |
| size_str = self._format_size(archive["size"]) | |
| files_count = archive.get("files_count", "unknown") | |
| logger.info(f"{archive['filename']:<30} {size_str:>10} {files_count:>6} files") | |
| total_size += archive["size"] | |
| logger.info("-" * 60) | |
| logger.info(f"Total: {len(metadata['archives'])} archives, {self._format_size(total_size)}") | |
| except Exception as e: | |
| logger.error(f"Error listing archives: {e}") | |
| def cleanup_storage(self): | |
| """Force cleanup of old archives""" | |
| try: | |
| metadata = self._get_metadata() | |
| old_size = metadata["total_size"] | |
| metadata = self._cleanup_old_archives(metadata) | |
| self._update_metadata(metadata) | |
| saved = old_size - metadata["total_size"] | |
| if saved > 0: | |
| logger.info(f"Cleanup completed. Saved {self._format_size(saved)} of storage") | |
| else: | |
| logger.info("No cleanup needed") | |
| except Exception as e: | |
| logger.error(f"Error during cleanup: {e}") | |
| def main(): | |
| import sys | |
| repo_id = os.getenv("HF_STORAGE_REPO", "nxdev-org/open-webui-storage") | |
| token = os.getenv("HF_TOKEN") | |
| data_dir = os.getenv("DATA_DIR", "/tmp/open-webui-data") | |
| max_backups = int(os.getenv("MAX_BACKUPS", "3")) | |
| sync = HFStorageSync(repo_id, token, data_dir, max_backups=max_backups) | |
| if len(sys.argv) > 1: | |
| command = sys.argv[1].lower() | |
| if command == "download": | |
| sync.download_data() | |
| elif command == "upload": | |
| force = len(sys.argv) > 2 and sys.argv[2] == "--force" | |
| sync.upload_data(force=force) | |
| elif command == "list": | |
| sync.list_archives() | |
| elif command == "cleanup": | |
| sync.cleanup_storage() | |
| else: | |
| print("Usage: sync_storage.py [download|upload [--force]|list|cleanup]") | |
| else: | |
| print("Usage: sync_storage.py [download|upload [--force]|list|cleanup]") | |
| print("\nCommands:") | |
| print(" download Download and extract latest data") | |
| print(" upload Upload data (only if changed)") | |
| print(" upload --force Force upload even if no changes detected") | |
| print(" list List all available archives") | |
| print(" cleanup Force cleanup of old archives") | |
| print("\nEnvironment variables:") | |
| print(" HF_STORAGE_REPO Hugging Face repository ID") | |
| print(" HF_TOKEN Hugging Face API token") | |
| print(" DATA_DIR Local data directory") | |
| print(" MAX_BACKUPS Maximum number of backups to keep (default: 3)") | |
| """ | |
| # Download latest data | |
| python sync_storage.py download | |
| # Upload only if changed (default) | |
| python sync_storage.py upload | |
| # Force upload regardless of changes | |
| python sync_storage.py upload --force | |
| # List all archives | |
| python sync_storage.py list | |
| # Clean up old archives | |
| python sync_storage.py cleanup | |
| # Set custom backup limit | |
| MAX_BACKUPS=5 python sync_storage.py upload | |
| """ | |
| if __name__ == "__main__": | |
| main() |