File size: 22,176 Bytes
d454b01
 
 
 
fcf7edb
d454b01
fcf7edb
4560dd6
d454b01
 
fcf7edb
 
 
 
 
d454b01
 
4560dd6
fcf7edb
5100142
d454b01
5100142
fcf7edb
 
0f909e7
 
fcf7edb
 
 
 
4560dd6
fcf7edb
 
 
 
 
4560dd6
fcf7edb
 
 
 
 
 
4560dd6
fcf7edb
 
 
 
 
 
4560dd6
fcf7edb
 
 
 
 
 
 
 
2509e3a
 
 
fcf7edb
2509e3a
 
 
 
fcf7edb
2509e3a
 
fcf7edb
2509e3a
 
 
 
 
fcf7edb
2509e3a
 
fcf7edb
 
 
 
 
 
 
4560dd6
fcf7edb
 
 
 
2509e3a
fcf7edb
 
 
 
 
 
 
 
 
2509e3a
 
fcf7edb
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
4560dd6
fcf7edb
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
4560dd6
fcf7edb
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
4560dd6
fcf7edb
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
d454b01
fcf7edb
d454b01
fcf7edb
0f909e7
 
 
 
 
 
 
fcf7edb
0f909e7
fcf7edb
0f909e7
 
fcf7edb
0f909e7
2509e3a
 
fcf7edb
5100142
fcf7edb
5100142
2a65be6
8f048eb
8bf0297
 
2a65be6
 
 
 
 
8bf0297
2a65be6
8bf0297
 
 
 
 
 
 
 
 
2a65be6
fcf7edb
5100142
 
2a65be6
5100142
 
 
 
 
fcf7edb
 
5100142
fcf7edb
d454b01
fcf7edb
4560dd6
fcf7edb
 
d454b01
0f909e7
fcf7edb
0f909e7
5100142
fcf7edb
 
 
 
 
 
 
 
 
5100142
fcf7edb
2509e3a
 
fcf7edb
2509e3a
fcf7edb
 
 
 
d454b01
fcf7edb
 
5100142
fcf7edb
0f909e7
fcf7edb
 
 
4560dd6
fcf7edb
 
 
d454b01
 
fcf7edb
d454b01
 
fcf7edb
5100142
d454b01
fcf7edb
 
 
 
 
 
 
 
 
 
d454b01
fcf7edb
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
4560dd6
 
 
d454b01
fcf7edb
4560dd6
fcf7edb
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
4560dd6
fcf7edb
 
 
 
 
 
 
 
 
 
 
 
 
 
d454b01
4560dd6
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
803cf0b
4560dd6
 
c3f29db
2a65be6
c3f29db
4560dd6
 
 
 
 
c3f29db
803cf0b
 
4560dd6
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
d454b01
 
2509e3a
d454b01
0f909e7
fcf7edb
4560dd6
fcf7edb
4560dd6
d454b01
fcf7edb
4560dd6
 
fcf7edb
d454b01
fcf7edb
 
 
 
 
 
4560dd6
 
 
 
 
 
 
 
d454b01
4560dd6
 
d454b01
4560dd6
d454b01
4560dd6
 
 
 
 
 
 
 
 
 
 
 
 
fcf7edb
 
d454b01
fcf7edb
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
#!/usr/bin/env python3
import os
import shutil
import json
import hashlib
from pathlib import Path
from datetime import datetime
from huggingface_hub import HfApi, create_repo, list_repo_files, delete_repo
import tarfile
import tempfile
import logging

# Set up logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)

class HFStorageSync:
    def __init__(self, repo_id, token=None, data_dir="/tmp/open-webui-data",
                 max_backups=3, compression_level=6):
        self.repo_id = repo_id
        self.data_dir = Path(data_dir)
        self.token = token
        self.max_backups = max_backups
        self.compression_level = compression_level
        # Initialize API with token directly
        self.api = HfApi(token=token) if token else HfApi()
        # File patterns for better organization
        self.archive_pattern = "data-{timestamp}.tar.gz"
        self.latest_link = "data-latest.tar.gz"
        self.metadata_file = "storage-metadata.json"

    def _get_directory_hash(self):
        """Calculate hash of directory contents for change detection"""
        hasher = hashlib.sha256()
        if not self.data_dir.exists():
            return hasher.hexdigest()

        for item in sorted(self.data_dir.rglob('*')):
            if item.is_file() and item.name not in [".gitkeep", "test_write"]:
                hasher.update(str(item.relative_to(self.data_dir)).encode())
                hasher.update(str(item.stat().st_mtime).encode())
                hasher.update(str(item.stat().st_size).encode())
        return hasher.hexdigest()

    def _get_archive_size(self, archive_path):
        """Get the size of an archive file"""
        try:
            return os.path.getsize(archive_path)
        except:
            return 0

    def _format_size(self, size_bytes):
        """Format file size in human readable format"""
        for unit in ['B', 'KB', 'MB', 'GB']:
            if size_bytes < 1024.0:
                return f"{size_bytes:.1f} {unit}"
            size_bytes /= 1024.0
        return f"{size_bytes:.1f} TB"

    def ensure_repo_exists(self):
        """Create repository if it doesn't exist"""
        if not self.token:
            logger.warning("No token provided, cannot create repository")
            return False
        try:
            # Check if repo exists
            repo_info = self.api.repo_info(repo_id=self.repo_id, repo_type="dataset")
            logger.info(f"Repository {self.repo_id} exists")
            return True
        except Exception as e:
            logger.info(f"Repository {self.repo_id} not found, attempting to create...")
            try:
                create_repo(
                    repo_id=self.repo_id,
                    repo_type="dataset",
                    token=self.token,
                    private=True,
                    exist_ok=True
                )
                logger.info(f"Created repository {self.repo_id}")
                # Create initial README and metadata
                self._create_initial_files()
                return True
            except Exception as create_error:
                logger.error(f"Failed to create repository: {create_error}")
                return False

    def _create_initial_files(self):
        """Create initial repository files"""
        readme_content = """# Open WebUI Storage
This dataset stores persistent data for Open WebUI deployment with automatic cleanup and versioning.
## Contents
- `data-latest.tar.gz`: Latest data archive (symlink)
- `data-YYYYMMDD-HHMMSS.tar.gz`: Timestamped data archives
- `storage-metadata.json`: Metadata about stored archives
- `README.md`: This file
## Features
- Automatic cleanup of old backups
- Change detection to avoid unnecessary uploads
- Compression optimization
- Storage usage monitoring
This repository is automatically managed by the Open WebUI sync system.
"""
        metadata = {
            "created": datetime.utcnow().isoformat(),
            "max_backups": self.max_backups,
            "archives": [],
            "total_size": 0
        }
        # Upload README
        with tempfile.NamedTemporaryFile(mode='w', suffix='.md', delete=False) as tmp:
            tmp.write(readme_content)
            tmp.flush()
            self.api.upload_file(
                path_or_fileobj=tmp.name,
                path_in_repo="README.md",
                repo_id=self.repo_id,
                repo_type="dataset",
                commit_message="Initial repository setup",
                token=self.token
            )
            os.unlink(tmp.name)
        # Upload metadata
        with tempfile.NamedTemporaryFile(mode='w', suffix='.json', delete=False) as tmp:
            json.dump(metadata, tmp, indent=2)
            tmp.flush()
            self.api.upload_file(
                path_or_fileobj=tmp.name,
                path_in_repo=self.metadata_file,
                repo_id=self.repo_id,
                repo_type="dataset",
                commit_message="Initial metadata",
                token=self.token
            )
            os.unlink(tmp.name)

    def _get_metadata(self):
        """Download and parse metadata"""
        try:
            file_path = self.api.hf_hub_download(
                repo_id=self.repo_id,
                filename=self.metadata_file,
                repo_type="dataset",
                token=self.token
            )
            with open(file_path, 'r') as f:
                return json.load(f)
        except Exception as e:
            logger.warning(f"Could not load metadata: {e}")
            return {
                "created": datetime.utcnow().isoformat(),
                "max_backups": self.max_backups,
                "archives": [],
                "total_size": 0
            }

    def _update_metadata(self, metadata):
        """Upload updated metadata"""
        try:
            with tempfile.NamedTemporaryFile(mode='w', suffix='.json', delete=False) as tmp:
                json.dump(metadata, tmp, indent=2)
                tmp.flush()
                self.api.upload_file(
                    path_or_fileobj=tmp.name,
                    path_in_repo=self.metadata_file,
                    repo_id=self.repo_id,
                    repo_type="dataset",
                    commit_message="Update metadata",
                    token=self.token
                )
                os.unlink(tmp.name)
        except Exception as e:
            logger.error(f"Failed to update metadata: {e}")

    def _cleanup_old_archives(self, metadata):
        """Remove old archives beyond max_backups limit"""
        if len(metadata["archives"]) <= self.max_backups:
            return metadata
        # Sort by timestamp, keep newest
        archives = sorted(metadata["archives"], key=lambda x: x["timestamp"], reverse=True)
        to_keep = archives[:self.max_backups]
        to_delete = archives[self.max_backups:]
        total_deleted_size = 0
        for archive in to_delete:
            try:
                self.api.delete_file(
                    path_in_repo=archive["filename"],
                    repo_id=self.repo_id,
                    repo_type="dataset",
                    token=self.token
                )
                total_deleted_size += archive["size"]
                logger.info(f"Deleted old archive: {archive['filename']} ({self._format_size(archive['size'])})")
            except Exception as e:
                logger.warning(f"Failed to delete {archive['filename']}: {e}")
        metadata["archives"] = to_keep
        metadata["total_size"] -= total_deleted_size
        if total_deleted_size > 0:
            logger.info(f"Cleaned up {self._format_size(total_deleted_size)} of storage")
        return metadata

    def download_data(self):
        """Download and extract latest data from HF dataset repo"""
        try:
            logger.info("Downloading data from Hugging Face...")
            # Ensure data directory exists and is writable
            self.data_dir.mkdir(parents=True, exist_ok=True)
            # Test write permissions
            test_file = self.data_dir / "test_write"
            try:
                test_file.touch()
                test_file.unlink()
                logger.info(f"Data directory {self.data_dir} is writable")
            except Exception as e:
                logger.warning(f"Data directory may not be writable: {e}")
                return
            if not self.token:
                logger.warning("No HF_TOKEN provided, skipping download")
                return
            # Ensure repository exists
            if not self.ensure_repo_exists():
                logger.error("Could not access or create repository")
                return
            # Try to download the latest data archive
            try:
                metadata = self._get_metadata()
                current_files = list_repo_files(self.repo_id, repo_type="dataset", token=self.token)
                current_files = [ i for i in current_files if i.startswith("data-") ]
                current_files = sorted(current_files)

                logger.info(f"Found {len(current_files)} files to preserve.")
                archive_filenames = [i["filename"] for i in metadata["archives"]]
                logger.info(f"Archive filenames: {archive_filenames}")
                logger.info(f"Current filenames: {current_files}")
                
                latest_link = self.latest_link
                logger.info(f"Current latest_link: {latest_link}")
                
                if (latest_link not in current_files):
                    if (len(current_files) > 0):
                        latest_link = current_files[-1]
                        logger.info(f"Latest link not found, falling back to last current file: {latest_link}")
                    else:
                        logger.error("No archives found in repository")
                        return
                logger.info(f"Downloading latest data archive: {latest_link}")
                # First try the latest link
                file_path = self.api.hf_hub_download(
                    repo_id=self.repo_id,
                    filename=latest_link,
                    repo_type="dataset",
                    token=self.token
                )
                with tarfile.open(file_path, 'r:gz') as tar:
                    tar.extractall(self.data_dir)
                archive_size = self._get_archive_size(file_path)
                logger.info(f"Data extracted to {self.data_dir} ({self._format_size(archive_size)})")
            except Exception as e:
                logger.info(f"No existing data found (normal for first run): {e}")
        except Exception as e:
            logger.error(f"Error during download: {e}")

    def upload_data(self, force=False):
        """Compress and upload data to HF dataset repo with change detection"""
        try:
            if not self.token:
                logger.warning("No HF_TOKEN provided, skipping upload")
                return
            if not self.data_dir.exists() or not any(self.data_dir.iterdir()):
                logger.warning("No data to upload")
                return
            # Calculate current directory hash
            current_hash = self._get_directory_hash()
            # Get metadata to check for changes
            metadata = self._get_metadata()
            last_hash = metadata.get("last_hash")
            if not force and current_hash == last_hash:
                logger.info("No changes detected, skipping upload")
                return
            logger.info("Changes detected, preparing upload...")
            # Ensure repository exists
            if not self.ensure_repo_exists():
                logger.error("Could not access or create repository")
                return
            # Create timestamped filename
            timestamp = datetime.utcnow().strftime("%Y%m%d-%H%M%S")
            archive_filename = self.archive_pattern.format(timestamp=timestamp)
            # Create temporary archive with optimized compression
            with tempfile.NamedTemporaryFile(suffix='.tar.gz', delete=False) as tmp:
                with tarfile.open(tmp.name, f'w:gz', compresslevel=self.compression_level) as tar:
                    total_files = 0
                    for item in self.data_dir.iterdir():
                        if item.name not in ["test_write", ".gitkeep"]:
                            tar.add(item, arcname=item.name)
                            if item.is_file():
                                total_files += 1
                            elif item.is_dir():
                                total_files += sum(1 for _f in item.rglob('*') if _f.is_file())
                archive_size = self._get_archive_size(tmp.name)
                logger.info(f"Created archive: {self._format_size(archive_size)}, {total_files} files")
                # Upload timestamped archive
                self.api.upload_file(
                    path_or_fileobj=tmp.name,
                    path_in_repo=archive_filename,
                    repo_id=self.repo_id,
                    repo_type="dataset",
                    commit_message=f"Update Open WebUI data - {timestamp}",
                    token=self.token
                )
                # Upload as latest (overwrite)
                self.api.upload_file(
                    path_or_fileobj=tmp.name,
                    path_in_repo=self.latest_link,
                    repo_id=self.repo_id,
                    repo_type="dataset",
                    commit_message=f"Update latest data - {timestamp}",
                    token=self.token
                )
                # Clean up temp file
                os.unlink(tmp.name)
            # Update metadata
            archive_info = {
                "filename": archive_filename,
                "timestamp": timestamp,
                "size": archive_size,
                "hash": current_hash,
                "files_count": total_files
            }
            metadata["archives"].append(archive_info)
            metadata["total_size"] += archive_size
            metadata["last_hash"] = current_hash
            metadata["last_upload"] = datetime.utcnow().isoformat()
            # Cleanup old archives
            metadata = self._cleanup_old_archives(metadata)
            # Update metadata
            self._update_metadata(metadata)
            logger.info(f"Upload successful: {archive_filename} ({self._format_size(archive_size)})")
            logger.info(f"Total storage used: {self._format_size(metadata['total_size'])}")

            # Remove old commit history
            self.prune_repo_history()
        except Exception as e:
            logger.error(f"Error uploading data: {e}")

    def list_archives(self):
        """List all available archives"""
        try:
            metadata = self._get_metadata()
            if not metadata["archives"]:
                logger.info("No archives found")
                return
            logger.info("Available archives:")
            logger.info("-" * 60)
            total_size = 0
            for archive in sorted(metadata["archives"], key=lambda x: x["timestamp"], reverse=True):
                size_str = self._format_size(archive["size"])
                files_count = archive.get("files_count", "unknown")
                logger.info(f"{archive['filename']:<30} {size_str:>10} {files_count:>6} files")
                total_size += archive["size"]
            logger.info("-" * 60)
            logger.info(f"Total: {len(metadata['archives'])} archives, {self._format_size(total_size)}")
        except Exception as e:
            logger.error(f"Error listing archives: {e}")

    def cleanup_storage(self):
        """Force cleanup of old archives"""
        try:
            metadata = self._get_metadata()
            old_size = metadata["total_size"]
            metadata = self._cleanup_old_archives(metadata)
            self._update_metadata(metadata)
            saved = old_size - metadata["total_size"]
            if saved > 0:
                logger.info(f"Cleanup completed. Saved {self._format_size(saved)} of storage")
            else:
                logger.info("No cleanup needed")
        except Exception as e:
            logger.error(f"Error during cleanup: {e}")

    # <<< NEW FUNCTION START >>>
    def prune_repo_history(self):
        """
        DANGEROUS: Deletes and recreates the repo to purge git history and reduce storage.
        This will erase all old versions, keeping only the current files.
        """
        logger.warning("="*60)
        logger.warning("!!! DESTRUCTIVE OPERATION INITIATED: PRUNE REPO HISTORY !!!")
        logger.warning(f"This will permanently delete the repository '{self.repo_id}' and its entire Git history.")
        logger.warning("Only the most recent versions of files will be preserved.")
        logger.warning("="*60)

        try:
            # 1. Get the list of current files to preserve
            logger.info("Step 1/5: Listing current files in the repository...")
            current_files = list_repo_files(self.repo_id, repo_type="dataset", token=self.token)
            if not current_files:
                logger.warning("Repository is empty or inaccessible. Nothing to prune.")
                return
            metadata = self._get_metadata()

            logger.info(f"Found {len(current_files)} files to preserve.")
            archive_filenames = [i["filename"] for i in metadata["archives"]]
            archive_filenames.append(self.latest_link)  # Also preserve latest link
            logger.info(f"Archive filenames: {archive_filenames}")

            # 2. Download all current files to a temporary directory
            with tempfile.TemporaryDirectory() as tmpdir:
                logger.info(f"Step 2/5: Downloading current files to a temporary location...")
                for file_path in current_files:
                    if file_path.startswith("data-") and (file_path not in archive_filenames):
                        logger.info(f"Skipping {file_path} (not in archive)")
                        continue
                    self.api.hf_hub_download(
                        repo_id=self.repo_id,
                        filename=file_path,
                        repo_type="dataset",
                        token=self.token,
                        local_dir=tmpdir,
                        local_dir_use_symlinks=False
                    )
                # 3. Delete the entire repository
                logger.warning(f"Step 3/5: Deleting repository '{self.repo_id}'...")
                delete_repo(self.repo_id, repo_type="dataset", token=self.token)
                logger.info("Repository deleted successfully.")

                # 4. Re-create the repository (now empty with no history)
                logger.info(f"Step 4/5: Re-creating repository '{self.repo_id}'...")
                self.ensure_repo_exists() # This will create it and initial files
                logger.info("Repository re-created successfully.")

                # 5. Upload the preserved files back to the new repository
                logger.info("Step 5/5: Uploading preserved files to the new repository...")
                self.api.upload_folder(
                    folder_path=tmpdir,
                    repo_id=self.repo_id,
                    repo_type="dataset",
                    token=self.token,
                    commit_message="Repo history pruned, restoring current files"
                )

            logger.info("="*60)
            logger.info("Repository history prune complete. Storage usage has been reset.")
            logger.info("="*60)

        except Exception as e:
            logger.error(f"An error occurred during the prune operation: {e}")
            logger.error("The repository may be in an inconsistent state. Please check the Hugging Face Hub.")
    # <<< NEW FUNCTION END >>>

def main():
    import sys
    repo_id = os.getenv("HF_STORAGE_REPO", "nxdev-org/open-webui-storage")
    token = os.getenv("HF_TOKEN")
    data_dir = os.getenv("DATA_DIR", "/tmp/open-webui-data")
    max_backups = int(os.getenv("MAX_BACKUPS", "3"))

    sync = HFStorageSync(repo_id, token, data_dir, max_backups=max_backups)

    if len(sys.argv) > 1:
        command = sys.argv[1].lower()
        force = "--force" in sys.argv

        if command == "download":
            sync.download_data()
        elif command == "upload":
            sync.upload_data(force=force)
        elif command == "list":
            sync.list_archives()
        elif command == "cleanup":
            sync.cleanup_storage()
        # <<< NEW COMMAND HANDLING START >>>
        elif command == "prune-history":
            if not force:
                print("ERROR: This is a destructive operation that will delete all backup history.")
                print("Please use 'prune-history --force' to confirm you want to proceed.")
                sys.exit(1)
            sync.prune_repo_history()
        # <<< NEW COMMAND HANDLING END >>>
        else:
            print(f"Unknown command: {command}")
            print_usage()
    else:
        print_usage()

def print_usage():
    print("Usage: sync_storage.py [command]")
    print("\nCommands:")
    print("  download                 Download and extract latest data")
    print("  upload [--force]         Upload data (use --force to ignore change detection)")
    print("  list                     List all available archives")
    print("  cleanup                  Force cleanup of old archives based on MAX_BACKUPS")
    print("  prune-history --force    DANGEROUS: Deletes and recreates the repo to clear all git history.")
    print("\nEnvironment variables:")
    print("  HF_STORAGE_REPO          Hugging Face repository ID")
    print("  HF_TOKEN                 Hugging Face API token")
    print("  DATA_DIR                 Local data directory")
    print("  MAX_BACKUPS              Maximum number of backups to keep (default: 3)")


if __name__ == "__main__":
    main()