open-webui / sync_storage.py
wwforonce's picture
fix download
8bf0297
#!/usr/bin/env python3
import os
import shutil
import json
import hashlib
from pathlib import Path
from datetime import datetime
from huggingface_hub import HfApi, create_repo, list_repo_files, delete_repo
import tarfile
import tempfile
import logging
# Set up logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
class HFStorageSync:
def __init__(self, repo_id, token=None, data_dir="/tmp/open-webui-data",
max_backups=3, compression_level=6):
self.repo_id = repo_id
self.data_dir = Path(data_dir)
self.token = token
self.max_backups = max_backups
self.compression_level = compression_level
# Initialize API with token directly
self.api = HfApi(token=token) if token else HfApi()
# File patterns for better organization
self.archive_pattern = "data-{timestamp}.tar.gz"
self.latest_link = "data-latest.tar.gz"
self.metadata_file = "storage-metadata.json"
def _get_directory_hash(self):
"""Calculate hash of directory contents for change detection"""
hasher = hashlib.sha256()
if not self.data_dir.exists():
return hasher.hexdigest()
for item in sorted(self.data_dir.rglob('*')):
if item.is_file() and item.name not in [".gitkeep", "test_write"]:
hasher.update(str(item.relative_to(self.data_dir)).encode())
hasher.update(str(item.stat().st_mtime).encode())
hasher.update(str(item.stat().st_size).encode())
return hasher.hexdigest()
def _get_archive_size(self, archive_path):
"""Get the size of an archive file"""
try:
return os.path.getsize(archive_path)
except:
return 0
def _format_size(self, size_bytes):
"""Format file size in human readable format"""
for unit in ['B', 'KB', 'MB', 'GB']:
if size_bytes < 1024.0:
return f"{size_bytes:.1f} {unit}"
size_bytes /= 1024.0
return f"{size_bytes:.1f} TB"
def ensure_repo_exists(self):
"""Create repository if it doesn't exist"""
if not self.token:
logger.warning("No token provided, cannot create repository")
return False
try:
# Check if repo exists
repo_info = self.api.repo_info(repo_id=self.repo_id, repo_type="dataset")
logger.info(f"Repository {self.repo_id} exists")
return True
except Exception as e:
logger.info(f"Repository {self.repo_id} not found, attempting to create...")
try:
create_repo(
repo_id=self.repo_id,
repo_type="dataset",
token=self.token,
private=True,
exist_ok=True
)
logger.info(f"Created repository {self.repo_id}")
# Create initial README and metadata
self._create_initial_files()
return True
except Exception as create_error:
logger.error(f"Failed to create repository: {create_error}")
return False
def _create_initial_files(self):
"""Create initial repository files"""
readme_content = """# Open WebUI Storage
This dataset stores persistent data for Open WebUI deployment with automatic cleanup and versioning.
## Contents
- `data-latest.tar.gz`: Latest data archive (symlink)
- `data-YYYYMMDD-HHMMSS.tar.gz`: Timestamped data archives
- `storage-metadata.json`: Metadata about stored archives
- `README.md`: This file
## Features
- Automatic cleanup of old backups
- Change detection to avoid unnecessary uploads
- Compression optimization
- Storage usage monitoring
This repository is automatically managed by the Open WebUI sync system.
"""
metadata = {
"created": datetime.utcnow().isoformat(),
"max_backups": self.max_backups,
"archives": [],
"total_size": 0
}
# Upload README
with tempfile.NamedTemporaryFile(mode='w', suffix='.md', delete=False) as tmp:
tmp.write(readme_content)
tmp.flush()
self.api.upload_file(
path_or_fileobj=tmp.name,
path_in_repo="README.md",
repo_id=self.repo_id,
repo_type="dataset",
commit_message="Initial repository setup",
token=self.token
)
os.unlink(tmp.name)
# Upload metadata
with tempfile.NamedTemporaryFile(mode='w', suffix='.json', delete=False) as tmp:
json.dump(metadata, tmp, indent=2)
tmp.flush()
self.api.upload_file(
path_or_fileobj=tmp.name,
path_in_repo=self.metadata_file,
repo_id=self.repo_id,
repo_type="dataset",
commit_message="Initial metadata",
token=self.token
)
os.unlink(tmp.name)
def _get_metadata(self):
"""Download and parse metadata"""
try:
file_path = self.api.hf_hub_download(
repo_id=self.repo_id,
filename=self.metadata_file,
repo_type="dataset",
token=self.token
)
with open(file_path, 'r') as f:
return json.load(f)
except Exception as e:
logger.warning(f"Could not load metadata: {e}")
return {
"created": datetime.utcnow().isoformat(),
"max_backups": self.max_backups,
"archives": [],
"total_size": 0
}
def _update_metadata(self, metadata):
"""Upload updated metadata"""
try:
with tempfile.NamedTemporaryFile(mode='w', suffix='.json', delete=False) as tmp:
json.dump(metadata, tmp, indent=2)
tmp.flush()
self.api.upload_file(
path_or_fileobj=tmp.name,
path_in_repo=self.metadata_file,
repo_id=self.repo_id,
repo_type="dataset",
commit_message="Update metadata",
token=self.token
)
os.unlink(tmp.name)
except Exception as e:
logger.error(f"Failed to update metadata: {e}")
def _cleanup_old_archives(self, metadata):
"""Remove old archives beyond max_backups limit"""
if len(metadata["archives"]) <= self.max_backups:
return metadata
# Sort by timestamp, keep newest
archives = sorted(metadata["archives"], key=lambda x: x["timestamp"], reverse=True)
to_keep = archives[:self.max_backups]
to_delete = archives[self.max_backups:]
total_deleted_size = 0
for archive in to_delete:
try:
self.api.delete_file(
path_in_repo=archive["filename"],
repo_id=self.repo_id,
repo_type="dataset",
token=self.token
)
total_deleted_size += archive["size"]
logger.info(f"Deleted old archive: {archive['filename']} ({self._format_size(archive['size'])})")
except Exception as e:
logger.warning(f"Failed to delete {archive['filename']}: {e}")
metadata["archives"] = to_keep
metadata["total_size"] -= total_deleted_size
if total_deleted_size > 0:
logger.info(f"Cleaned up {self._format_size(total_deleted_size)} of storage")
return metadata
def download_data(self):
"""Download and extract latest data from HF dataset repo"""
try:
logger.info("Downloading data from Hugging Face...")
# Ensure data directory exists and is writable
self.data_dir.mkdir(parents=True, exist_ok=True)
# Test write permissions
test_file = self.data_dir / "test_write"
try:
test_file.touch()
test_file.unlink()
logger.info(f"Data directory {self.data_dir} is writable")
except Exception as e:
logger.warning(f"Data directory may not be writable: {e}")
return
if not self.token:
logger.warning("No HF_TOKEN provided, skipping download")
return
# Ensure repository exists
if not self.ensure_repo_exists():
logger.error("Could not access or create repository")
return
# Try to download the latest data archive
try:
metadata = self._get_metadata()
current_files = list_repo_files(self.repo_id, repo_type="dataset", token=self.token)
current_files = [ i for i in current_files if i.startswith("data-") ]
current_files = sorted(current_files)
logger.info(f"Found {len(current_files)} files to preserve.")
archive_filenames = [i["filename"] for i in metadata["archives"]]
logger.info(f"Archive filenames: {archive_filenames}")
logger.info(f"Current filenames: {current_files}")
latest_link = self.latest_link
logger.info(f"Current latest_link: {latest_link}")
if (latest_link not in current_files):
if (len(current_files) > 0):
latest_link = current_files[-1]
logger.info(f"Latest link not found, falling back to last current file: {latest_link}")
else:
logger.error("No archives found in repository")
return
logger.info(f"Downloading latest data archive: {latest_link}")
# First try the latest link
file_path = self.api.hf_hub_download(
repo_id=self.repo_id,
filename=latest_link,
repo_type="dataset",
token=self.token
)
with tarfile.open(file_path, 'r:gz') as tar:
tar.extractall(self.data_dir)
archive_size = self._get_archive_size(file_path)
logger.info(f"Data extracted to {self.data_dir} ({self._format_size(archive_size)})")
except Exception as e:
logger.info(f"No existing data found (normal for first run): {e}")
except Exception as e:
logger.error(f"Error during download: {e}")
def upload_data(self, force=False):
"""Compress and upload data to HF dataset repo with change detection"""
try:
if not self.token:
logger.warning("No HF_TOKEN provided, skipping upload")
return
if not self.data_dir.exists() or not any(self.data_dir.iterdir()):
logger.warning("No data to upload")
return
# Calculate current directory hash
current_hash = self._get_directory_hash()
# Get metadata to check for changes
metadata = self._get_metadata()
last_hash = metadata.get("last_hash")
if not force and current_hash == last_hash:
logger.info("No changes detected, skipping upload")
return
logger.info("Changes detected, preparing upload...")
# Ensure repository exists
if not self.ensure_repo_exists():
logger.error("Could not access or create repository")
return
# Create timestamped filename
timestamp = datetime.utcnow().strftime("%Y%m%d-%H%M%S")
archive_filename = self.archive_pattern.format(timestamp=timestamp)
# Create temporary archive with optimized compression
with tempfile.NamedTemporaryFile(suffix='.tar.gz', delete=False) as tmp:
with tarfile.open(tmp.name, f'w:gz', compresslevel=self.compression_level) as tar:
total_files = 0
for item in self.data_dir.iterdir():
if item.name not in ["test_write", ".gitkeep"]:
tar.add(item, arcname=item.name)
if item.is_file():
total_files += 1
elif item.is_dir():
total_files += sum(1 for _f in item.rglob('*') if _f.is_file())
archive_size = self._get_archive_size(tmp.name)
logger.info(f"Created archive: {self._format_size(archive_size)}, {total_files} files")
# Upload timestamped archive
self.api.upload_file(
path_or_fileobj=tmp.name,
path_in_repo=archive_filename,
repo_id=self.repo_id,
repo_type="dataset",
commit_message=f"Update Open WebUI data - {timestamp}",
token=self.token
)
# Upload as latest (overwrite)
self.api.upload_file(
path_or_fileobj=tmp.name,
path_in_repo=self.latest_link,
repo_id=self.repo_id,
repo_type="dataset",
commit_message=f"Update latest data - {timestamp}",
token=self.token
)
# Clean up temp file
os.unlink(tmp.name)
# Update metadata
archive_info = {
"filename": archive_filename,
"timestamp": timestamp,
"size": archive_size,
"hash": current_hash,
"files_count": total_files
}
metadata["archives"].append(archive_info)
metadata["total_size"] += archive_size
metadata["last_hash"] = current_hash
metadata["last_upload"] = datetime.utcnow().isoformat()
# Cleanup old archives
metadata = self._cleanup_old_archives(metadata)
# Update metadata
self._update_metadata(metadata)
logger.info(f"Upload successful: {archive_filename} ({self._format_size(archive_size)})")
logger.info(f"Total storage used: {self._format_size(metadata['total_size'])}")
# Remove old commit history
self.prune_repo_history()
except Exception as e:
logger.error(f"Error uploading data: {e}")
def list_archives(self):
"""List all available archives"""
try:
metadata = self._get_metadata()
if not metadata["archives"]:
logger.info("No archives found")
return
logger.info("Available archives:")
logger.info("-" * 60)
total_size = 0
for archive in sorted(metadata["archives"], key=lambda x: x["timestamp"], reverse=True):
size_str = self._format_size(archive["size"])
files_count = archive.get("files_count", "unknown")
logger.info(f"{archive['filename']:<30} {size_str:>10} {files_count:>6} files")
total_size += archive["size"]
logger.info("-" * 60)
logger.info(f"Total: {len(metadata['archives'])} archives, {self._format_size(total_size)}")
except Exception as e:
logger.error(f"Error listing archives: {e}")
def cleanup_storage(self):
"""Force cleanup of old archives"""
try:
metadata = self._get_metadata()
old_size = metadata["total_size"]
metadata = self._cleanup_old_archives(metadata)
self._update_metadata(metadata)
saved = old_size - metadata["total_size"]
if saved > 0:
logger.info(f"Cleanup completed. Saved {self._format_size(saved)} of storage")
else:
logger.info("No cleanup needed")
except Exception as e:
logger.error(f"Error during cleanup: {e}")
# <<< NEW FUNCTION START >>>
def prune_repo_history(self):
"""
DANGEROUS: Deletes and recreates the repo to purge git history and reduce storage.
This will erase all old versions, keeping only the current files.
"""
logger.warning("="*60)
logger.warning("!!! DESTRUCTIVE OPERATION INITIATED: PRUNE REPO HISTORY !!!")
logger.warning(f"This will permanently delete the repository '{self.repo_id}' and its entire Git history.")
logger.warning("Only the most recent versions of files will be preserved.")
logger.warning("="*60)
try:
# 1. Get the list of current files to preserve
logger.info("Step 1/5: Listing current files in the repository...")
current_files = list_repo_files(self.repo_id, repo_type="dataset", token=self.token)
if not current_files:
logger.warning("Repository is empty or inaccessible. Nothing to prune.")
return
metadata = self._get_metadata()
logger.info(f"Found {len(current_files)} files to preserve.")
archive_filenames = [i["filename"] for i in metadata["archives"]]
archive_filenames.append(self.latest_link) # Also preserve latest link
logger.info(f"Archive filenames: {archive_filenames}")
# 2. Download all current files to a temporary directory
with tempfile.TemporaryDirectory() as tmpdir:
logger.info(f"Step 2/5: Downloading current files to a temporary location...")
for file_path in current_files:
if file_path.startswith("data-") and (file_path not in archive_filenames):
logger.info(f"Skipping {file_path} (not in archive)")
continue
self.api.hf_hub_download(
repo_id=self.repo_id,
filename=file_path,
repo_type="dataset",
token=self.token,
local_dir=tmpdir,
local_dir_use_symlinks=False
)
# 3. Delete the entire repository
logger.warning(f"Step 3/5: Deleting repository '{self.repo_id}'...")
delete_repo(self.repo_id, repo_type="dataset", token=self.token)
logger.info("Repository deleted successfully.")
# 4. Re-create the repository (now empty with no history)
logger.info(f"Step 4/5: Re-creating repository '{self.repo_id}'...")
self.ensure_repo_exists() # This will create it and initial files
logger.info("Repository re-created successfully.")
# 5. Upload the preserved files back to the new repository
logger.info("Step 5/5: Uploading preserved files to the new repository...")
self.api.upload_folder(
folder_path=tmpdir,
repo_id=self.repo_id,
repo_type="dataset",
token=self.token,
commit_message="Repo history pruned, restoring current files"
)
logger.info("="*60)
logger.info("Repository history prune complete. Storage usage has been reset.")
logger.info("="*60)
except Exception as e:
logger.error(f"An error occurred during the prune operation: {e}")
logger.error("The repository may be in an inconsistent state. Please check the Hugging Face Hub.")
# <<< NEW FUNCTION END >>>
def main():
import sys
repo_id = os.getenv("HF_STORAGE_REPO", "nxdev-org/open-webui-storage")
token = os.getenv("HF_TOKEN")
data_dir = os.getenv("DATA_DIR", "/tmp/open-webui-data")
max_backups = int(os.getenv("MAX_BACKUPS", "3"))
sync = HFStorageSync(repo_id, token, data_dir, max_backups=max_backups)
if len(sys.argv) > 1:
command = sys.argv[1].lower()
force = "--force" in sys.argv
if command == "download":
sync.download_data()
elif command == "upload":
sync.upload_data(force=force)
elif command == "list":
sync.list_archives()
elif command == "cleanup":
sync.cleanup_storage()
# <<< NEW COMMAND HANDLING START >>>
elif command == "prune-history":
if not force:
print("ERROR: This is a destructive operation that will delete all backup history.")
print("Please use 'prune-history --force' to confirm you want to proceed.")
sys.exit(1)
sync.prune_repo_history()
# <<< NEW COMMAND HANDLING END >>>
else:
print(f"Unknown command: {command}")
print_usage()
else:
print_usage()
def print_usage():
print("Usage: sync_storage.py [command]")
print("\nCommands:")
print(" download Download and extract latest data")
print(" upload [--force] Upload data (use --force to ignore change detection)")
print(" list List all available archives")
print(" cleanup Force cleanup of old archives based on MAX_BACKUPS")
print(" prune-history --force DANGEROUS: Deletes and recreates the repo to clear all git history.")
print("\nEnvironment variables:")
print(" HF_STORAGE_REPO Hugging Face repository ID")
print(" HF_TOKEN Hugging Face API token")
print(" DATA_DIR Local data directory")
print(" MAX_BACKUPS Maximum number of backups to keep (default: 3)")
if __name__ == "__main__":
main()