Spaces:
Running
Running
| # DEPENDENCIES | |
| import json | |
| import shutil | |
| import zipfile | |
| from typing import List | |
| from pathlib import Path | |
| from typing import Optional | |
| from datetime import datetime | |
| from config.settings import get_settings | |
| from utils.file_handler import FileHandler | |
| from config.logging_config import get_logger | |
| from utils.error_handler import handle_errors | |
| from utils.error_handler import IndexingError | |
| from vector_store.index_persister import get_index_persister | |
| # Setup Settings and Logging | |
| settings = get_settings() | |
| logger = get_logger(__name__) | |
| class BackupManager: | |
| """ | |
| Automated backup management for vector indexes and metadata: Creates compressed backups with versioning | |
| """ | |
| def __init__(self, backup_dir: Optional[Path] = None, vector_store_dir: Optional[Path] = None): | |
| """ | |
| Initialize backup manager | |
| Arguments: | |
| ---------- | |
| backup_dir { Path } : Directory for backups | |
| vector_store_dir { Path } : Directory containing indexes to backup | |
| """ | |
| self.logger = logger | |
| self.backup_dir = Path(backup_dir or settings.BACKUP_DIR) | |
| self.vector_store_dir = Path(vector_store_dir or settings.VECTOR_STORE_DIR) | |
| # Ensure directories exist | |
| FileHandler.ensure_directory(self.backup_dir) | |
| FileHandler.ensure_directory(self.vector_store_dir) | |
| # Backup configuration | |
| self.auto_backup = settings.AUTO_BACKUP | |
| # Documents between auto-backups | |
| self.backup_interval = settings.BACKUP_INTERVAL | |
| self.backup_count = 0 | |
| self.logger.info(f"Initialized BackupManager: backup_dir={self.backup_dir}, auto_backup={self.auto_backup}") | |
| def create_backup(self, backup_name: Optional[str] = None, description: Optional[str] = None) -> str: | |
| """ | |
| Create a backup of vector store | |
| Arguments: | |
| ---------- | |
| backup_name { str } : Name for backup (default: timestamp-based) | |
| description { str } : Description of backup | |
| Returns: | |
| -------- | |
| { str } : Path to backup file | |
| """ | |
| if not backup_name: | |
| timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") | |
| backup_name = f"backup_{timestamp}" | |
| backup_path = self.backup_dir / f"{backup_name}.zip" | |
| self.logger.info(f"Creating backup: {backup_path}") | |
| try: | |
| with zipfile.ZipFile(backup_path, 'w', zipfile.ZIP_DEFLATED) as zipf: | |
| # Add all index files | |
| index_files = list(self.vector_store_dir.glob("*")) | |
| for file_path in index_files: | |
| if file_path.is_file(): | |
| arcname = file_path.relative_to(self.vector_store_dir) | |
| zipf.write(file_path, arcname) | |
| self.logger.debug(f"Added to backup: {file_path.name}") | |
| # Add backup metadata | |
| backup_metadata = {"backup_name" : backup_name, | |
| "description" : description, | |
| "created_at" : datetime.now().isoformat(), | |
| "file_count" : len(index_files), | |
| "vector_store_dir" : str(self.vector_store_dir), | |
| } | |
| zipf.writestr("backup_metadata.json", json.dumps(backup_metadata, indent = 4)) | |
| self.backup_count += 1 | |
| backup_size = backup_path.stat().st_size / (1024 * 1024) # MB | |
| self.logger.info(f"Backup created: {backup_path} ({backup_size:.2f} MB)") | |
| return str(backup_path) | |
| except Exception as e: | |
| self.logger.error(f"Backup creation failed: {repr(e)}") | |
| raise IndexingError(f"Backup creation failed: {repr(e)}") | |
| def restore_backup(self, backup_path: Path, restore_dir: Optional[Path] = None) -> dict: | |
| """ | |
| Restore from a backup file | |
| Arguments: | |
| ---------- | |
| backup_path { Path } : Path to backup file | |
| restore_dir { Path } : Directory to restore to (default: vector_store_dir) | |
| Returns: | |
| -------- | |
| { dict } : Restore statistics | |
| """ | |
| backup_path = Path(backup_path) | |
| restore_dir = Path(restore_dir or self.vector_store_dir) | |
| if not backup_path.exists(): | |
| raise IndexingError(f"Backup file not found: {backup_path}") | |
| self.logger.info(f"Restoring backup: {backup_path} to {restore_dir}") | |
| # Ensure restore directory exists | |
| FileHandler.ensure_directory(restore_dir) | |
| try: | |
| with zipfile.ZipFile(backup_path, 'r') as zipf: | |
| # Extract all files | |
| zipf.extractall(restore_dir) | |
| # Get backup metadata | |
| backup_metadata = dict() | |
| if "backup_metadata.json" in zipf.namelist(): | |
| metadata_str = zipf.read("backup_metadata.json").decode('utf-8') | |
| backup_metadata = json.loads(metadata_str) | |
| # Exclude metadata file | |
| restored_files = len(zipf.namelist()) - 1 | |
| self.logger.info(f"Restored {restored_files} files from backup") | |
| return {"restored_files" : restored_files, | |
| "backup_name" : backup_metadata.get("backup_name", "unknown"), | |
| "backup_date" : backup_metadata.get("created_at", "unknown"), | |
| "restore_dir" : str(restore_dir), | |
| } | |
| except Exception as e: | |
| self.logger.error(f"Backup restoration failed: {repr(e)}") | |
| raise IndexingError(f"Backup restoration failed: {repr(e)}") | |
| def list_backups(self) -> List[dict]: | |
| """ | |
| List all available backups | |
| Returns: | |
| -------- | |
| { list } : List of backup information dictionaries | |
| """ | |
| backups = list() | |
| for backup_file in self.backup_dir.glob("*.zip"): | |
| try: | |
| with zipfile.ZipFile(backup_file, 'r') as zipf: | |
| if "backup_metadata.json" in zipf.namelist(): | |
| metadata_str = zipf.read("backup_metadata.json").decode('utf-8') | |
| metadata = json.loads(metadata_str) | |
| else: | |
| metadata = {"backup_name": backup_file.stem} | |
| file_stat = backup_file.stat() | |
| backup_info = {"name" : backup_file.name, | |
| "path" : str(backup_file), | |
| "size_mb" : file_stat.st_size / (1024 * 1024), | |
| "created" : datetime.fromtimestamp(file_stat.st_mtime).isoformat(), | |
| "metadata" : metadata, | |
| } | |
| backups.append(backup_info) | |
| except Exception as e: | |
| self.logger.warning(f"Could not read backup info for {backup_file}: {repr(e)}") | |
| # Sort by creation time (newest first) | |
| backups.sort(key = lambda x: x["created"], | |
| reverse = True, | |
| ) | |
| return backups | |
| def auto_backup_check(self, documents_processed: int) -> Optional[str]: | |
| """ | |
| Check if auto-backup should be triggered | |
| Arguments: | |
| ---------- | |
| documents_processed { int } : Number of documents processed since last backup | |
| Returns: | |
| -------- | |
| { str } : Backup path if backup was created, None otherwise | |
| """ | |
| if not self.auto_backup: | |
| return None | |
| if (documents_processed >= self.backup_interval): | |
| self.logger.info(f"Auto-backup triggered after {documents_processed} documents") | |
| backup_name = f"auto_backup_{datetime.now().strftime('%Y%m%d_%H%M%S')}" | |
| description = f"Auto-backup after {documents_processed} documents" | |
| return self.create_backup(backup_name, description) | |
| return None | |
| def cleanup_old_backups(self, keep_count: int = 5, keep_days: int = 30) -> dict: | |
| """ | |
| Clean up old backups based on count and age | |
| Arguments: | |
| ---------- | |
| keep_count { int } : Number of most recent backups to keep | |
| keep_days { int } : Maximum age of backups in days | |
| Returns: | |
| -------- | |
| { dict } : Cleanup results | |
| """ | |
| backups = self.list_backups() | |
| if (len(backups) <= keep_count): | |
| return {"deleted" : 0, | |
| "kept" : len(backups), | |
| "message" : "No cleanup needed", | |
| } | |
| # Sort by date (oldest first for deletion) | |
| backups.sort(key = lambda x: x["created"]) | |
| deleted_count = 0 | |
| cutoff_date = datetime.now().timestamp() - (keep_days * 24 * 60 * 60) | |
| # Keep most recent N | |
| for backup in backups[:-keep_count]: | |
| backup_date = datetime.fromisoformat(backup["created"]).timestamp() | |
| # Delete if too old or beyond keep count | |
| if ((backup_date < cutoff_date) or (len(backups) - deleted_count > keep_count)): | |
| try: | |
| Path(backup["path"]).unlink() | |
| deleted_count += 1 | |
| self.logger.info(f"Deleted old backup: {backup['name']}") | |
| except Exception as e: | |
| self.logger.warning(f"Failed to delete backup {backup['name']}: {repr(e)}") | |
| remaining = len(backups) - deleted_count | |
| self.logger.info(f"Backup cleanup: deleted {deleted_count}, kept {remaining}") | |
| return {"deleted" : deleted_count, | |
| "kept" : remaining, | |
| "total_before" : len(backups), | |
| "message" : f"Cleanup completed: {deleted_count} backups deleted", | |
| } | |
| def get_backup_stats(self) -> dict: | |
| """ | |
| Get backup statistics | |
| Returns: | |
| -------- | |
| { dict } : Backup statistics | |
| """ | |
| backups = self.list_backups() | |
| total_size = sum(backup["size_mb"] for backup in backups) | |
| oldest_backup = min(backups, key = lambda x: x["created"])["created"] if backups else None | |
| newest_backup = max(backups, key = lambda x: x["created"])["created"] if backups else None | |
| return {"total_backups" : len(backups), | |
| "total_size_mb" : total_size, | |
| "oldest_backup" : oldest_backup, | |
| "newest_backup" : newest_backup, | |
| "auto_backup" : self.auto_backup, | |
| "backup_interval" : self.backup_interval, | |
| "backup_dir" : str(self.backup_dir), | |
| } | |
| def verify_backup(self, backup_path: Path) -> bool: | |
| """ | |
| Verify backup integrity | |
| Arguments: | |
| ---------- | |
| backup_path { Path } : Path to backup file | |
| Returns: | |
| -------- | |
| { bool } : True if backup is valid | |
| """ | |
| try: | |
| with zipfile.ZipFile(backup_path, 'r') as zipf: | |
| # Test zip integrity | |
| bad_file = zipf.testzip() | |
| if bad_file is not None: | |
| self.logger.error(f"Backup corrupted: {bad_file}") | |
| return False | |
| # Check for essential files | |
| persister = get_index_persister() | |
| essential_files = [persister.faiss_index_path.name, | |
| persister.faiss_metadata_path.name, | |
| persister.bm25_index_path.name, | |
| ] | |
| existing_files = zipf.namelist() | |
| missing_files = [f for f in essential_files if f not in existing_files] | |
| if missing_files: | |
| self.logger.warning(f"Backup missing files: {missing_files}") | |
| # Not necessarily invalid, but incomplete | |
| return True | |
| except Exception as e: | |
| self.logger.error(f"Backup verification failed: {repr(e)}") | |
| return False | |
| # Global backup manager instance | |
| _backup_manager = None | |
| def get_backup_manager(backup_dir: Optional[Path] = None, vector_store_dir: Optional[Path] = None) -> BackupManager: | |
| """ | |
| Get global backup manager instance | |
| Arguments: | |
| ---------- | |
| backup_dir { Path } : Backup directory | |
| vector_store_dir { Path } : Vector store directory | |
| Returns: | |
| -------- | |
| { BackupManager } : BackupManager instance | |
| """ | |
| global _backup_manager | |
| if _backup_manager is None: | |
| _backup_manager = BackupManager(backup_dir, vector_store_dir) | |
| return _backup_manager | |
| def create_backup(backup_name: Optional[str] = None, **kwargs) -> str: | |
| """ | |
| Convenience function to create backup | |
| Arguments: | |
| ---------- | |
| backup_name { str } : Backup name | |
| **kwargs : Additional arguments | |
| Returns: | |
| -------- | |
| { str } : Backup path | |
| """ | |
| manager = get_backup_manager() | |
| return manager.create_backup(backup_name, **kwargs) |