Spaces:
Paused
Paused
| #!/usr/bin/env python3 | |
| import os | |
| import sys | |
| import subprocess | |
| import datetime | |
| import sqlite3 | |
| import tempfile | |
| from pathlib import Path | |
| from huggingface_hub import HfApi, hf_hub_download | |
| # Set up path to include the application module | |
| SCRIPT_DIR = Path(__file__).parent.resolve() | |
| BACKEND_DIR = SCRIPT_DIR.parent | |
| sys.path.append(str(BACKEND_DIR)) | |
| # Database path (actual application database) | |
| DATA_DIR = os.environ.get("DATA_DIR", "/app/backend/data") | |
| DB_FILE_PATH = os.path.join(DATA_DIR, "webui.db") | |
| # Hugging Face repo paths (virtual paths in HF storage) | |
| REPO_DB_GPG_FILE = "db_backup/webui.db.gpg" | |
| REPO_TIMESTAMP_FILE = "db_backup/last_backup_time.txt" | |
| def check_gpg(): | |
| """Verify GPG is available.""" | |
| try: | |
| subprocess.run(["gpg", "--version"], check=True, capture_output=True) | |
| return True | |
| except (subprocess.CalledProcessError, FileNotFoundError): | |
| print("Error: gpg is not installed or not in PATH") | |
| return False | |
| def validate_environment(): | |
| """Verify all required environment variables are set.""" | |
| required = ["BACKUP_PASSPHRASE", "HF_TOKEN", "SPACE_ID"] | |
| missing = [var for var in required if not os.environ.get(var)] | |
| if missing: | |
| print(f"Error: Missing environment variables: {', '.join(missing)}") | |
| return False | |
| return True | |
| def ensure_data_dir(): | |
| """Ensure the database directory exists.""" | |
| try: | |
| os.makedirs(DATA_DIR, mode=0o755, exist_ok=True) | |
| return True | |
| except Exception as e: | |
| print(f"Error creating data directory: {e}") | |
| return False | |
| def get_latest_backup_info(repo_id, hf_token): | |
| """Check if backup exists and get its timestamp.""" | |
| api = HfApi() | |
| try: | |
| files = api.list_repo_files(repo_id=repo_id, repo_type="space", token=hf_token) | |
| if REPO_DB_GPG_FILE not in files: | |
| print("No backup file found in repository") | |
| return False, None | |
| if REPO_TIMESTAMP_FILE in files: | |
| try: | |
| timestamp_file = hf_hub_download( | |
| repo_id=repo_id, | |
| repo_type="space", | |
| filename=REPO_TIMESTAMP_FILE, | |
| token=hf_token | |
| ) | |
| with open(timestamp_file, "r", encoding="utf-8") as f: | |
| timestamp_str = f.read().strip() | |
| timestamp = datetime.datetime.fromisoformat(timestamp_str) | |
| print(f"Found backup from: {timestamp} UTC") | |
| return True, timestamp | |
| except Exception as e: | |
| print(f"Could not read timestamp: {e}") | |
| return True, None | |
| return True, None | |
| except Exception as e: | |
| print(f"Error checking repository: {e}") | |
| return False, None | |
| def decrypt_database_from_memory(encrypted_data, passphrase): | |
| """Decrypt database directly from memory.""" | |
| try: | |
| # Create a secure temporary directory for GPG operations | |
| with tempfile.TemporaryDirectory(prefix='gpg_home_') as gpg_home: | |
| os.chmod(gpg_home, 0o700) | |
| # Create a temporary file for the encrypted data | |
| with tempfile.NamedTemporaryFile(mode='wb', suffix='.gpg', delete=False) as temp_encrypted: | |
| temp_encrypted.write(encrypted_data) | |
| temp_encrypted_path = temp_encrypted.name | |
| try: | |
| print(f"Decrypting database ({len(encrypted_data)} bytes)...") | |
| decrypt_cmd = [ | |
| "gpg", | |
| "--batch", | |
| "--yes", | |
| "--homedir", gpg_home, | |
| "--passphrase", passphrase, | |
| "--pinentry-mode", "loopback", | |
| "-d", | |
| "-o", DB_FILE_PATH, | |
| temp_encrypted_path | |
| ] | |
| result = subprocess.run(decrypt_cmd, capture_output=True, check=True) | |
| if os.path.exists(DB_FILE_PATH) and os.path.getsize(DB_FILE_PATH) > 0: | |
| print(f"Database decrypted successfully ({os.path.getsize(DB_FILE_PATH)} bytes)") | |
| return True | |
| else: | |
| print("Error: Decrypted database is missing or empty") | |
| return False | |
| finally: | |
| # Clean up the temporary encrypted file | |
| if os.path.exists(temp_encrypted_path): | |
| os.unlink(temp_encrypted_path) | |
| except subprocess.CalledProcessError as e: | |
| print(f"Decryption failed: {e.stderr.decode()}") | |
| return False | |
| except Exception as e: | |
| print(f"Decryption error: {e}") | |
| return False | |
| def verify_database(): | |
| """Verify the restored database integrity.""" | |
| if not os.path.exists(DB_FILE_PATH): | |
| print(f"Error: Database file not found at {DB_FILE_PATH}") | |
| return False | |
| try: | |
| print("Verifying database integrity...") | |
| with sqlite3.connect(DB_FILE_PATH) as conn: | |
| cursor = conn.cursor() | |
| cursor.execute("PRAGMA integrity_check;") | |
| result = cursor.fetchone()[0] | |
| cursor.execute("SELECT name FROM sqlite_master WHERE type='table';") | |
| tables = cursor.fetchall() | |
| if result.lower() == "ok" and len(tables) > 0: | |
| print(f"Database verified: {len(tables)} tables found") | |
| return True | |
| print("Database verification failed") | |
| return False | |
| except Exception as e: | |
| print(f"Database verification error: {e}") | |
| return False | |
| def restore_db(): | |
| """Main restore function using in-memory approach.""" | |
| if not check_gpg() or not validate_environment() or not ensure_data_dir(): | |
| return False | |
| passphrase = os.environ["BACKUP_PASSPHRASE"] | |
| hf_token = os.environ["HF_TOKEN"] | |
| space_id = os.environ["SPACE_ID"] | |
| backup_exists, timestamp = get_latest_backup_info(space_id, hf_token) | |
| if not backup_exists: | |
| print("No backup found - starting with fresh database") | |
| return True | |
| try: | |
| print("Downloading encrypted database...") | |
| encrypted_file = hf_hub_download( | |
| repo_id=space_id, | |
| repo_type="space", | |
| filename=REPO_DB_GPG_FILE, | |
| token=hf_token | |
| ) | |
| # Read encrypted data into memory | |
| with open(encrypted_file, 'rb') as f: | |
| encrypted_data = f.read() | |
| print(f"Downloaded encrypted data: {len(encrypted_data)} bytes") | |
| if not decrypt_database_from_memory(encrypted_data, passphrase): | |
| print("Failed to decrypt database") | |
| return False | |
| if not verify_database(): | |
| print("Failed to verify database") | |
| if os.path.exists(DB_FILE_PATH): | |
| os.unlink(DB_FILE_PATH) | |
| return False | |
| print("Database restore completed successfully!") | |
| return True | |
| except Exception as e: | |
| print(f"Restore error: {e}") | |
| if os.path.exists(DB_FILE_PATH): | |
| os.unlink(DB_FILE_PATH) | |
| return False | |
| if __name__ == "__main__": | |
| success = restore_db() | |
| sys.exit(0 if success else 1) | |