Spaces:
Sleeping
Sleeping
| """ | |
| S3 Backup Manager | |
| Handles automatic backup of SQLite database to S3-compatible storage. | |
| Provides debouncing to prevent excessive S3 uploads and background | |
| threading for non-blocking backup operations. | |
| """ | |
| import os | |
| import sqlite3 | |
| import hashlib | |
| import threading | |
| import time | |
| import logging | |
| from datetime import datetime | |
| from typing import Optional | |
| from dataclasses import dataclass | |
| import boto3 | |
| from botocore.exceptions import ClientError | |
| from ..utils import s3_logger | |
| from .s3_config import ( | |
| S3Config, | |
| S3BackupError, | |
| S3CredentialsError, | |
| S3BucketNotFoundError, | |
| S3ConnectionError, | |
| DatabaseCorruptedError | |
| ) | |
| logger = logging.getLogger(__name__) | |
| class BackupMetadata: | |
| """Metadata for a backup file in S3.""" | |
| s3_key: str | |
| last_modified: datetime | |
| size_bytes: int | |
| checksum_sha256: Optional[str] = None | |
| class BackupManager: | |
| """ | |
| Manages automatic backup of SQLite database to S3. | |
| Features: | |
| - Non-blocking backup requests with debouncing | |
| - Background thread execution | |
| - Retry logic with exponential backoff | |
| - Checksum validation | |
| - Graceful error handling | |
| """ | |
| def __init__(self, config: S3Config, db_path: str): | |
| """ | |
| Initialize the backup manager. | |
| Args: | |
| config: S3 configuration object | |
| db_path: Absolute path to SQLite database file | |
| Raises: | |
| ValueError: If db_path does not exist or is not readable | |
| """ | |
| if not os.path.exists(db_path): | |
| raise ValueError(f"Database file does not exist: {db_path}") | |
| if not os.access(db_path, os.R_OK): | |
| raise ValueError(f"Database file is not readable: {db_path}") | |
| self.config = config | |
| self.db_path = db_path | |
| self.last_backup_request = None | |
| self.backup_lock = threading.Lock() | |
| self._debounce_thread = None | |
| if config.enabled: | |
| self.s3_client = config.create_s3_client() | |
| logger.info(f"BackupManager initialized for {db_path}") | |
| else: | |
| self.s3_client = None | |
| logger.info("BackupManager initialized but S3 is disabled") | |
| def request_backup(self) -> None: | |
| """ | |
| Non-blocking method to request a database backup. | |
| Uses debouncing to prevent excessive S3 uploads. Multiple requests | |
| within the debounce period are collapsed into a single backup. | |
| Side Effects: | |
| - Starts background thread if not already running | |
| - Updates last_backup_request timestamp | |
| """ | |
| if not self.config.enabled: | |
| s3_logger.backup_skip_reason('s3_not_enabled', 'backup_requested') | |
| return | |
| with self.backup_lock: | |
| self.last_backup_request = time.time() | |
| # Start debounce thread if not already running | |
| if self._debounce_thread is None or not self._debounce_thread.is_alive(): | |
| self._debounce_thread = threading.Thread( | |
| target=self._debounced_backup, | |
| daemon=True | |
| ) | |
| self._debounce_thread.start() | |
| logger.debug("Started debounce thread for backup") | |
| def _debounced_backup(self) -> None: | |
| """ | |
| Wait for debounce period, then execute backup. | |
| This runs in a background thread and waits for the debounce period | |
| to ensure no more backup requests arrive before executing. | |
| """ | |
| debounce_seconds = self.config.debounce_seconds | |
| # Wait for debounce period | |
| time.sleep(debounce_seconds) | |
| # Check if another request came in during debounce | |
| with self.backup_lock: | |
| time_since_last_request = time.time() - self.last_backup_request | |
| if time_since_last_request < debounce_seconds: | |
| # Another request came in, let it handle the backup | |
| logger.debug("Backup skipped - newer request pending") | |
| return | |
| # Execute the backup | |
| self._execute_backup() | |
| def execute_backup_now(self) -> bool: | |
| """ | |
| Synchronous method to execute backup immediately, bypassing debounce. | |
| Returns: | |
| True if backup succeeded, False if it failed | |
| Raises: | |
| S3CredentialsError: Invalid S3 credentials | |
| S3BucketNotFoundError: Bucket does not exist | |
| DatabaseCorruptedError: Source database failed integrity check | |
| """ | |
| if not self.config.enabled: | |
| logger.warning("Backup requested but S3 is disabled") | |
| return False | |
| return self._execute_backup() | |
| def _execute_backup(self) -> bool: | |
| """ | |
| Execute the actual backup operation. | |
| Process: | |
| 1. Check database integrity | |
| 2. Create hot backup using sqlite3.backup() | |
| 3. Calculate checksum | |
| 4. Upload to S3 with retries | |
| 5. Clean up temp files | |
| Returns: | |
| True if backup succeeded, False otherwise | |
| """ | |
| start_time = time.time() | |
| db_size = os.path.getsize(self.db_path) | |
| s3_logger.backup_started(self.db_path, db_size) | |
| logger.info(f"Starting backup of {self.db_path} ({db_size} bytes)") | |
| temp_path = None | |
| try: | |
| # Validate source database integrity | |
| if not self._validate_database(self.db_path): | |
| raise DatabaseCorruptedError("Source database failed integrity check") | |
| # Create backup using sqlite3.backup() API | |
| temp_path = f"{self.db_path}.backup" | |
| self._create_hot_backup(temp_path) | |
| # Calculate checksum | |
| checksum = self._calculate_checksum(temp_path) | |
| # Upload to S3 with timestamp | |
| timestamp = datetime.utcnow().strftime('%Y-%m-%d-%H-%M-%S') | |
| s3_key = f"contacts-{timestamp}.db" | |
| if not self._upload_to_s3(temp_path, s3_key, checksum): | |
| return False | |
| # Success | |
| duration = time.time() - start_time | |
| upload_size = os.path.getsize(temp_path) | |
| s3_logger.backup_completed(duration, s3_key, upload_size) | |
| logger.info(f"Backup completed successfully: {s3_key} ({duration:.2f}s)") | |
| return True | |
| except DatabaseCorruptedError as e: | |
| s3_logger.backup_failed(str(e)) | |
| logger.error(f"Backup failed - database corrupted: {e}") | |
| raise | |
| except Exception as e: | |
| s3_logger.backup_failed(str(e)) | |
| logger.error(f"Backup failed: {e}", exc_info=True) | |
| return False | |
| finally: | |
| # Clean up temp file | |
| if temp_path and os.path.exists(temp_path): | |
| try: | |
| os.remove(temp_path) | |
| logger.debug(f"Cleaned up temp file: {temp_path}") | |
| except OSError as e: | |
| logger.warning(f"Failed to clean up temp file: {e}") | |
| def _create_hot_backup(self, dest_path: str) -> None: | |
| """ | |
| Create a hot backup of the database using sqlite3.backup() API. | |
| This method is safe to use while the database is being written to, | |
| as sqlite3.backup() handles concurrent access properly. | |
| Args: | |
| dest_path: Path where backup should be created | |
| """ | |
| logger.debug(f"Creating hot backup to {dest_path}") | |
| # Connect to source database | |
| source_conn = sqlite3.connect(self.db_path) | |
| try: | |
| # Create destination connection | |
| dest_conn = sqlite3.connect(dest_path) | |
| try: | |
| # Execute hot backup | |
| source_conn.backup(dest_conn) | |
| logger.debug("Hot backup completed") | |
| finally: | |
| dest_conn.close() | |
| finally: | |
| source_conn.close() | |
| def _calculate_checksum(self, file_path: str) -> str: | |
| """ | |
| Calculate SHA-256 checksum of a file. | |
| Args: | |
| file_path: Path to file | |
| Returns: | |
| Hexadecimal SHA-256 checksum string | |
| """ | |
| sha256_hash = hashlib.sha256() | |
| with open(file_path, 'rb') as f: | |
| # Read in chunks for memory efficiency | |
| for chunk in iter(lambda: f.read(8192), b''): | |
| sha256_hash.update(chunk) | |
| checksum = sha256_hash.hexdigest() | |
| logger.debug(f"Calculated checksum: {checksum}") | |
| return checksum | |
| def _upload_to_s3( | |
| self, | |
| file_path: str, | |
| s3_key: str, | |
| checksum: str, | |
| max_retries: int = 3 | |
| ) -> bool: | |
| """ | |
| Upload file to S3 with retry logic and exponential backoff. | |
| Args: | |
| file_path: Path to file to upload | |
| s3_key: S3 object key | |
| checksum: SHA-256 checksum to store in metadata | |
| max_retries: Maximum number of retry attempts | |
| Returns: | |
| True if upload succeeded, False otherwise | |
| """ | |
| file_size = os.path.getsize(file_path) | |
| for attempt in range(max_retries): | |
| try: | |
| logger.debug(f"Uploading to S3: {s3_key} (attempt {attempt + 1}/{max_retries})") | |
| # Upload with metadata | |
| with open(file_path, 'rb') as f: | |
| self.s3_client.upload_fileobj( | |
| f, | |
| self.config.bucket, | |
| s3_key, | |
| ExtraArgs={ | |
| 'Metadata': { | |
| 'sha256': checksum, | |
| 'source_host': os.uname().nodename, | |
| 'db_version': str(sqlite3.sqlite_version) | |
| } | |
| } | |
| ) | |
| logger.info(f"Upload successful: {s3_key} ({file_size} bytes)") | |
| return True | |
| except ClientError as e: | |
| error_code = e.response['Error']['Code'] | |
| # Permanent errors - don't retry | |
| if error_code in ['NoSuchBucket', 'AccessDenied', 'InvalidAccessKeyId']: | |
| s3_logger.backup_failed(error_code, attempt + 1, max_retries) | |
| logger.error(f"Permanent S3 error: {error_code}") | |
| if error_code == 'NoSuchBucket': | |
| raise S3BucketNotFoundError(f"Bucket not found: {self.config.bucket}") | |
| elif error_code in ['AccessDenied', 'InvalidAccessKeyId']: | |
| raise S3CredentialsError(f"Invalid credentials: {error_code}") | |
| return False | |
| # Transient errors - retry with backoff | |
| if attempt < max_retries - 1: | |
| backoff = 2 ** attempt # 1s, 2s, 4s | |
| logger.warning( | |
| f"S3 upload failed (attempt {attempt + 1}/{max_retries}): {error_code}, " | |
| f"retrying in {backoff}s" | |
| ) | |
| time.sleep(backoff) | |
| else: | |
| s3_logger.backup_failed(error_code, attempt + 1, max_retries) | |
| logger.error(f"S3 upload failed after {max_retries} attempts: {error_code}") | |
| return False | |
| except Exception as e: | |
| s3_logger.backup_failed(str(e), attempt + 1, max_retries) | |
| logger.error(f"Unexpected error during S3 upload: {e}", exc_info=True) | |
| return False | |
| return False | |
| def _validate_database(self, db_path: str) -> bool: | |
| """ | |
| Validate SQLite database integrity. | |
| Args: | |
| db_path: Path to database file | |
| Returns: | |
| True if database passes integrity check, False otherwise | |
| """ | |
| try: | |
| conn = sqlite3.connect(db_path) | |
| cursor = conn.cursor() | |
| cursor.execute("PRAGMA integrity_check") | |
| result = cursor.fetchone()[0] | |
| conn.close() | |
| if result == 'ok': | |
| logger.debug(f"Database integrity check passed: {db_path}") | |
| return True | |
| else: | |
| logger.error(f"Database integrity check failed: {result}") | |
| return False | |
| except Exception as e: | |
| logger.error(f"Database validation failed: {e}") | |
| return False | |
| def get_latest_backup(self) -> Optional[BackupMetadata]: | |
| """ | |
| Query S3 for the latest backup file. | |
| Returns: | |
| BackupMetadata object with latest backup info, or None if no backups found | |
| Raises: | |
| S3ConnectionError: Network or S3 service error | |
| """ | |
| if not self.config.enabled: | |
| return None | |
| try: | |
| logger.debug(f"Querying S3 for latest backup in bucket: {self.config.bucket}") | |
| response = self.s3_client.list_objects_v2( | |
| Bucket=self.config.bucket, | |
| Prefix='contacts-' | |
| ) | |
| if 'Contents' not in response or len(response['Contents']) == 0: | |
| logger.info("No backups found in S3") | |
| return None | |
| # Find latest by LastModified | |
| latest = max(response['Contents'], key=lambda x: x['LastModified']) | |
| # Get metadata if available | |
| try: | |
| head_response = self.s3_client.head_object( | |
| Bucket=self.config.bucket, | |
| Key=latest['Key'] | |
| ) | |
| checksum = head_response.get('Metadata', {}).get('sha256') | |
| except Exception as e: | |
| logger.warning(f"Failed to get metadata for {latest['Key']}: {e}") | |
| checksum = None | |
| metadata = BackupMetadata( | |
| s3_key=latest['Key'], | |
| last_modified=latest['LastModified'], | |
| size_bytes=latest['Size'], | |
| checksum_sha256=checksum | |
| ) | |
| logger.info(f"Latest backup: {metadata.s3_key} ({metadata.last_modified})") | |
| return metadata | |
| except ClientError as e: | |
| error_code = e.response['Error']['Code'] | |
| logger.error(f"S3 error querying backups: {error_code}") | |
| raise S3ConnectionError(f"S3 error: {error_code}") from e | |
| except Exception as e: | |
| logger.error(f"Unexpected error querying backups: {e}") | |
| raise S3ConnectionError(f"Error querying backups: {e}") from e | |