open-webui / sync_storage.py
nxdev-org's picture
Update sync_storage.py
fcf7edb verified
raw
history blame
17.9 kB
#!/usr/bin/env python3
import os
import shutil
import json
import hashlib
from pathlib import Path
from datetime import datetime
from huggingface_hub import HfApi, create_repo, list_repo_files
import tarfile
import tempfile
import logging
# Set up logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
class HFStorageSync:
def __init__(self, repo_id, token=None, data_dir="/tmp/open-webui-data",
max_backups=3, compression_level=6):
self.repo_id = repo_id
self.data_dir = Path(data_dir)
self.token = token
self.max_backups = max_backups
self.compression_level = compression_level
# Initialize API with token directly
self.api = HfApi(token=token) if token else HfApi()
# File patterns for better organization
self.archive_pattern = "data-{timestamp}.tar.gz"
self.latest_link = "data-latest.tar.gz"
self.metadata_file = "storage-metadata.json"
def _get_directory_hash(self):
"""Calculate hash of directory contents for change detection"""
hasher = hashlib.sha256()
if not self.data_dir.exists():
return hasher.hexdigest()
for item in sorted(self.data_dir.rglob('*')):
if item.is_file() and item.name not in [".gitkeep", "test_write"]:
hasher.update(str(item.relative_to(self.data_dir)).encode())
hasher.update(str(item.stat().st_mtime).encode())
hasher.update(str(item.stat().st_size).encode())
return hasher.hexdigest()
def _get_archive_size(self, archive_path):
"""Get the size of an archive file"""
try:
return os.path.getsize(archive_path)
except:
return 0
def _format_size(self, size_bytes):
"""Format file size in human readable format"""
for unit in ['B', 'KB', 'MB', 'GB']:
if size_bytes < 1024.0:
return f"{size_bytes:.1f} {unit}"
size_bytes /= 1024.0
return f"{size_bytes:.1f} TB"
def ensure_repo_exists(self):
"""Create repository if it doesn't exist"""
if not self.token:
logger.warning("No token provided, cannot create repository")
return False
try:
# Check if repo exists
repo_info = self.api.repo_info(repo_id=self.repo_id, repo_type="dataset")
logger.info(f"Repository {self.repo_id} exists")
return True
except Exception as e:
logger.info(f"Repository {self.repo_id} not found, attempting to create...")
try:
create_repo(
repo_id=self.repo_id,
repo_type="dataset",
token=self.token,
private=True,
exist_ok=True
)
logger.info(f"Created repository {self.repo_id}")
# Create initial README and metadata
self._create_initial_files()
return True
except Exception as create_error:
logger.error(f"Failed to create repository: {create_error}")
return False
def _create_initial_files(self):
"""Create initial repository files"""
readme_content = """# Open WebUI Storage
This dataset stores persistent data for Open WebUI deployment with automatic cleanup and versioning.
## Contents
- `data-latest.tar.gz`: Latest data archive (symlink)
- `data-YYYYMMDD-HHMMSS.tar.gz`: Timestamped data archives
- `storage-metadata.json`: Metadata about stored archives
- `README.md`: This file
## Features
- Automatic cleanup of old backups
- Change detection to avoid unnecessary uploads
- Compression optimization
- Storage usage monitoring
This repository is automatically managed by the Open WebUI sync system.
"""
metadata = {
"created": datetime.utcnow().isoformat(),
"max_backups": self.max_backups,
"archives": [],
"total_size": 0
}
# Upload README
with tempfile.NamedTemporaryFile(mode='w', suffix='.md', delete=False) as tmp:
tmp.write(readme_content)
tmp.flush()
self.api.upload_file(
path_or_fileobj=tmp.name,
path_in_repo="README.md",
repo_id=self.repo_id,
repo_type="dataset",
commit_message="Initial repository setup",
token=self.token
)
os.unlink(tmp.name)
# Upload metadata
with tempfile.NamedTemporaryFile(mode='w', suffix='.json', delete=False) as tmp:
json.dump(metadata, tmp, indent=2)
tmp.flush()
self.api.upload_file(
path_or_fileobj=tmp.name,
path_in_repo=self.metadata_file,
repo_id=self.repo_id,
repo_type="dataset",
commit_message="Initial metadata",
token=self.token
)
os.unlink(tmp.name)
def _get_metadata(self):
"""Download and parse metadata"""
try:
file_path = self.api.hf_hub_download(
repo_id=self.repo_id,
filename=self.metadata_file,
repo_type="dataset",
token=self.token
)
with open(file_path, 'r') as f:
return json.load(f)
except Exception as e:
logger.warning(f"Could not load metadata: {e}")
return {
"created": datetime.utcnow().isoformat(),
"max_backups": self.max_backups,
"archives": [],
"total_size": 0
}
def _update_metadata(self, metadata):
"""Upload updated metadata"""
try:
with tempfile.NamedTemporaryFile(mode='w', suffix='.json', delete=False) as tmp:
json.dump(metadata, tmp, indent=2)
tmp.flush()
self.api.upload_file(
path_or_fileobj=tmp.name,
path_in_repo=self.metadata_file,
repo_id=self.repo_id,
repo_type="dataset",
commit_message="Update metadata",
token=self.token
)
os.unlink(tmp.name)
except Exception as e:
logger.error(f"Failed to update metadata: {e}")
def _cleanup_old_archives(self, metadata):
"""Remove old archives beyond max_backups limit"""
if len(metadata["archives"]) <= self.max_backups:
return metadata
# Sort by timestamp, keep newest
archives = sorted(metadata["archives"], key=lambda x: x["timestamp"], reverse=True)
to_keep = archives[:self.max_backups]
to_delete = archives[self.max_backups:]
total_deleted_size = 0
for archive in to_delete:
try:
self.api.delete_file(
path_in_repo=archive["filename"],
repo_id=self.repo_id,
repo_type="dataset",
token=self.token
)
total_deleted_size += archive["size"]
logger.info(f"Deleted old archive: {archive['filename']} ({self._format_size(archive['size'])})")
except Exception as e:
logger.warning(f"Failed to delete {archive['filename']}: {e}")
metadata["archives"] = to_keep
metadata["total_size"] -= total_deleted_size
if total_deleted_size > 0:
logger.info(f"Cleaned up {self._format_size(total_deleted_size)} of storage")
return metadata
def download_data(self):
"""Download and extract latest data from HF dataset repo"""
try:
logger.info("Downloading data from Hugging Face...")
# Ensure data directory exists and is writable
self.data_dir.mkdir(parents=True, exist_ok=True)
# Test write permissions
test_file = self.data_dir / "test_write"
try:
test_file.touch()
test_file.unlink()
logger.info(f"Data directory {self.data_dir} is writable")
except Exception as e:
logger.warning(f"Data directory may not be writable: {e}")
return
if not self.token:
logger.warning("No HF_TOKEN provided, skipping download")
return
# Ensure repository exists
if not self.ensure_repo_exists():
logger.error("Could not access or create repository")
return
# Try to download the latest data archive
try:
# First try the latest link
file_path = self.api.hf_hub_download(
repo_id=self.repo_id,
filename=self.latest_link,
repo_type="dataset",
token=self.token
)
with tarfile.open(file_path, 'r:gz') as tar:
tar.extractall(self.data_dir)
archive_size = self._get_archive_size(file_path)
logger.info(f"Data extracted to {self.data_dir} ({self._format_size(archive_size)})")
except Exception as e:
logger.info(f"No existing data found (normal for first run): {e}")
except Exception as e:
logger.error(f"Error during download: {e}")
def upload_data(self, force=False):
"""Compress and upload data to HF dataset repo with change detection"""
try:
if not self.token:
logger.warning("No HF_TOKEN provided, skipping upload")
return
if not self.data_dir.exists() or not any(self.data_dir.iterdir()):
logger.warning("No data to upload")
return
# Calculate current directory hash
current_hash = self._get_directory_hash()
# Get metadata to check for changes
metadata = self._get_metadata()
last_hash = metadata.get("last_hash")
if not force and current_hash == last_hash:
logger.info("No changes detected, skipping upload")
return
logger.info("Changes detected, preparing upload...")
# Ensure repository exists
if not self.ensure_repo_exists():
logger.error("Could not access or create repository")
return
# Create timestamped filename
timestamp = datetime.utcnow().strftime("%Y%m%d-%H%M%S")
archive_filename = self.archive_pattern.format(timestamp=timestamp)
# Create temporary archive with optimized compression
with tempfile.NamedTemporaryFile(suffix='.tar.gz', delete=False) as tmp:
with tarfile.open(tmp.name, f'w:gz', compresslevel=self.compression_level) as tar:
total_files = 0
for item in self.data_dir.iterdir():
if item.name not in ["test_write", ".gitkeep"]:
tar.add(item, arcname=item.name)
if item.is_file():
total_files += 1
elif item.is_dir():
total_files += sum(1 for _ in item.rglob('*') if _.is_file())
archive_size = self._get_archive_size(tmp.name)
logger.info(f"Created archive: {self._format_size(archive_size)}, {total_files} files")
# Upload timestamped archive
self.api.upload_file(
path_or_fileobj=tmp.name,
path_in_repo=archive_filename,
repo_id=self.repo_id,
repo_type="dataset",
commit_message=f"Update Open WebUI data - {timestamp}",
token=self.token
)
# Upload as latest (overwrite)
self.api.upload_file(
path_or_fileobj=tmp.name,
path_in_repo=self.latest_link,
repo_id=self.repo_id,
repo_type="dataset",
commit_message=f"Update latest data - {timestamp}",
token=self.token
)
# Clean up temp file
os.unlink(tmp.name)
# Update metadata
archive_info = {
"filename": archive_filename,
"timestamp": timestamp,
"size": archive_size,
"hash": current_hash,
"files_count": total_files
}
metadata["archives"].append(archive_info)
metadata["total_size"] += archive_size
metadata["last_hash"] = current_hash
metadata["last_upload"] = datetime.utcnow().isoformat()
# Cleanup old archives
metadata = self._cleanup_old_archives(metadata)
# Update metadata
self._update_metadata(metadata)
logger.info(f"Upload successful: {archive_filename} ({self._format_size(archive_size)})")
logger.info(f"Total storage used: {self._format_size(metadata['total_size'])}")
except Exception as e:
logger.error(f"Error uploading data: {e}")
def list_archives(self):
"""List all available archives"""
try:
metadata = self._get_metadata()
if not metadata["archives"]:
logger.info("No archives found")
return
logger.info("Available archives:")
logger.info("-" * 60)
total_size = 0
for archive in sorted(metadata["archives"], key=lambda x: x["timestamp"], reverse=True):
size_str = self._format_size(archive["size"])
files_count = archive.get("files_count", "unknown")
logger.info(f"{archive['filename']:<30} {size_str:>10} {files_count:>6} files")
total_size += archive["size"]
logger.info("-" * 60)
logger.info(f"Total: {len(metadata['archives'])} archives, {self._format_size(total_size)}")
except Exception as e:
logger.error(f"Error listing archives: {e}")
def cleanup_storage(self):
"""Force cleanup of old archives"""
try:
metadata = self._get_metadata()
old_size = metadata["total_size"]
metadata = self._cleanup_old_archives(metadata)
self._update_metadata(metadata)
saved = old_size - metadata["total_size"]
if saved > 0:
logger.info(f"Cleanup completed. Saved {self._format_size(saved)} of storage")
else:
logger.info("No cleanup needed")
except Exception as e:
logger.error(f"Error during cleanup: {e}")
def main():
import sys
repo_id = os.getenv("HF_STORAGE_REPO", "nxdev-org/open-webui-storage")
token = os.getenv("HF_TOKEN")
data_dir = os.getenv("DATA_DIR", "/tmp/open-webui-data")
max_backups = int(os.getenv("MAX_BACKUPS", "3"))
sync = HFStorageSync(repo_id, token, data_dir, max_backups=max_backups)
if len(sys.argv) > 1:
command = sys.argv[1].lower()
if command == "download":
sync.download_data()
elif command == "upload":
force = len(sys.argv) > 2 and sys.argv[2] == "--force"
sync.upload_data(force=force)
elif command == "list":
sync.list_archives()
elif command == "cleanup":
sync.cleanup_storage()
else:
print("Usage: sync_storage.py [download|upload [--force]|list|cleanup]")
else:
print("Usage: sync_storage.py [download|upload [--force]|list|cleanup]")
print("\nCommands:")
print(" download Download and extract latest data")
print(" upload Upload data (only if changed)")
print(" upload --force Force upload even if no changes detected")
print(" list List all available archives")
print(" cleanup Force cleanup of old archives")
print("\nEnvironment variables:")
print(" HF_STORAGE_REPO Hugging Face repository ID")
print(" HF_TOKEN Hugging Face API token")
print(" DATA_DIR Local data directory")
print(" MAX_BACKUPS Maximum number of backups to keep (default: 3)")
"""
# Download latest data
python sync_storage.py download
# Upload only if changed (default)
python sync_storage.py upload
# Force upload regardless of changes
python sync_storage.py upload --force
# List all archives
python sync_storage.py list
# Clean up old archives
python sync_storage.py cleanup
# Set custom backup limit
MAX_BACKUPS=5 python sync_storage.py upload
"""
if __name__ == "__main__":
main()