|
|
|
|
|
import os |
|
|
import sys |
|
|
import subprocess |
|
|
import datetime |
|
|
import sqlite3 |
|
|
import tempfile |
|
|
from pathlib import Path |
|
|
from huggingface_hub import HfApi, hf_hub_download |
|
|
|
|
|
|
|
|
SCRIPT_DIR = Path(__file__).parent.resolve() |
|
|
BACKEND_DIR = SCRIPT_DIR.parent |
|
|
sys.path.append(str(BACKEND_DIR)) |
|
|
|
|
|
|
|
|
DATA_DIR = os.environ.get("DATA_DIR", "/app/backend/data") |
|
|
DB_FILE_PATH = os.path.join(DATA_DIR, "webui.db") |
|
|
|
|
|
|
|
|
REPO_DB_GPG_FILE = "db_backup/webui.db.gpg" |
|
|
REPO_TIMESTAMP_FILE = "db_backup/last_backup_time.txt" |
|
|
|
|
|
|
|
|
def check_gpg(): |
|
|
"""Verify GPG is available.""" |
|
|
try: |
|
|
subprocess.run(["gpg", "--version"], check=True, capture_output=True) |
|
|
return True |
|
|
except (subprocess.CalledProcessError, FileNotFoundError): |
|
|
print("Error: gpg is not installed or not in PATH") |
|
|
return False |
|
|
|
|
|
|
|
|
def validate_environment(): |
|
|
"""Verify all required environment variables are set.""" |
|
|
required = ["BACKUP_PASSPHRASE", "HF_TOKEN", "SPACE_ID"] |
|
|
missing = [var for var in required if not os.environ.get(var)] |
|
|
if missing: |
|
|
print(f"Error: Missing environment variables: {', '.join(missing)}") |
|
|
return False |
|
|
return True |
|
|
|
|
|
|
|
|
def ensure_data_dir(): |
|
|
"""Ensure the database directory exists.""" |
|
|
try: |
|
|
os.makedirs(DATA_DIR, mode=0o755, exist_ok=True) |
|
|
return True |
|
|
except Exception as e: |
|
|
print(f"Error creating data directory: {e}") |
|
|
return False |
|
|
|
|
|
|
|
|
def get_latest_backup_info(repo_id, hf_token): |
|
|
"""Check if backup exists and get its timestamp.""" |
|
|
api = HfApi() |
|
|
try: |
|
|
files = api.list_repo_files(repo_id=repo_id, repo_type="space", token=hf_token) |
|
|
|
|
|
if REPO_DB_GPG_FILE not in files: |
|
|
print("No backup file found in repository") |
|
|
return False, None |
|
|
|
|
|
if REPO_TIMESTAMP_FILE in files: |
|
|
try: |
|
|
timestamp_file = hf_hub_download( |
|
|
repo_id=repo_id, |
|
|
repo_type="space", |
|
|
filename=REPO_TIMESTAMP_FILE, |
|
|
token=hf_token |
|
|
) |
|
|
with open(timestamp_file, "r", encoding="utf-8") as f: |
|
|
timestamp_str = f.read().strip() |
|
|
timestamp = datetime.datetime.fromisoformat(timestamp_str) |
|
|
print(f"Found backup from: {timestamp} UTC") |
|
|
return True, timestamp |
|
|
except Exception as e: |
|
|
print(f"Could not read timestamp: {e}") |
|
|
return True, None |
|
|
return True, None |
|
|
except Exception as e: |
|
|
print(f"Error checking repository: {e}") |
|
|
return False, None |
|
|
|
|
|
|
|
|
def decrypt_database_from_memory(encrypted_data, passphrase): |
|
|
"""Decrypt database directly from memory.""" |
|
|
try: |
|
|
|
|
|
with tempfile.TemporaryDirectory(prefix='gpg_home_') as gpg_home: |
|
|
os.chmod(gpg_home, 0o700) |
|
|
|
|
|
|
|
|
with tempfile.NamedTemporaryFile(mode='wb', suffix='.gpg', delete=False) as temp_encrypted: |
|
|
temp_encrypted.write(encrypted_data) |
|
|
temp_encrypted_path = temp_encrypted.name |
|
|
|
|
|
try: |
|
|
print(f"Decrypting database ({len(encrypted_data)} bytes)...") |
|
|
decrypt_cmd = [ |
|
|
"gpg", |
|
|
"--batch", |
|
|
"--yes", |
|
|
"--homedir", gpg_home, |
|
|
"--passphrase", passphrase, |
|
|
"--pinentry-mode", "loopback", |
|
|
"-d", |
|
|
"-o", DB_FILE_PATH, |
|
|
temp_encrypted_path |
|
|
] |
|
|
|
|
|
result = subprocess.run(decrypt_cmd, capture_output=True, check=True) |
|
|
|
|
|
if os.path.exists(DB_FILE_PATH) and os.path.getsize(DB_FILE_PATH) > 0: |
|
|
print(f"Database decrypted successfully ({os.path.getsize(DB_FILE_PATH)} bytes)") |
|
|
return True |
|
|
else: |
|
|
print("Error: Decrypted database is missing or empty") |
|
|
return False |
|
|
|
|
|
finally: |
|
|
|
|
|
if os.path.exists(temp_encrypted_path): |
|
|
os.unlink(temp_encrypted_path) |
|
|
|
|
|
except subprocess.CalledProcessError as e: |
|
|
print(f"Decryption failed: {e.stderr.decode()}") |
|
|
return False |
|
|
except Exception as e: |
|
|
print(f"Decryption error: {e}") |
|
|
return False |
|
|
|
|
|
|
|
|
def verify_database(): |
|
|
"""Verify the restored database integrity.""" |
|
|
if not os.path.exists(DB_FILE_PATH): |
|
|
print(f"Error: Database file not found at {DB_FILE_PATH}") |
|
|
return False |
|
|
|
|
|
try: |
|
|
print("Verifying database integrity...") |
|
|
with sqlite3.connect(DB_FILE_PATH) as conn: |
|
|
cursor = conn.cursor() |
|
|
cursor.execute("PRAGMA integrity_check;") |
|
|
result = cursor.fetchone()[0] |
|
|
cursor.execute("SELECT name FROM sqlite_master WHERE type='table';") |
|
|
tables = cursor.fetchall() |
|
|
|
|
|
if result.lower() == "ok" and len(tables) > 0: |
|
|
print(f"Database verified: {len(tables)} tables found") |
|
|
return True |
|
|
print("Database verification failed") |
|
|
return False |
|
|
except Exception as e: |
|
|
print(f"Database verification error: {e}") |
|
|
return False |
|
|
|
|
|
|
|
|
def restore_db(): |
|
|
"""Main restore function using in-memory approach.""" |
|
|
if not check_gpg() or not validate_environment() or not ensure_data_dir(): |
|
|
return False |
|
|
|
|
|
passphrase = os.environ["BACKUP_PASSPHRASE"] |
|
|
hf_token = os.environ["HF_TOKEN"] |
|
|
space_id = os.environ["SPACE_ID"] |
|
|
|
|
|
backup_exists, timestamp = get_latest_backup_info(space_id, hf_token) |
|
|
if not backup_exists: |
|
|
print("No backup found - starting with fresh database") |
|
|
return True |
|
|
|
|
|
try: |
|
|
print("Downloading encrypted database...") |
|
|
encrypted_file = hf_hub_download( |
|
|
repo_id=space_id, |
|
|
repo_type="space", |
|
|
filename=REPO_DB_GPG_FILE, |
|
|
token=hf_token |
|
|
) |
|
|
|
|
|
|
|
|
with open(encrypted_file, 'rb') as f: |
|
|
encrypted_data = f.read() |
|
|
|
|
|
print(f"Downloaded encrypted data: {len(encrypted_data)} bytes") |
|
|
|
|
|
if not decrypt_database_from_memory(encrypted_data, passphrase): |
|
|
print("Failed to decrypt database") |
|
|
return False |
|
|
|
|
|
if not verify_database(): |
|
|
print("Failed to verify database") |
|
|
if os.path.exists(DB_FILE_PATH): |
|
|
os.unlink(DB_FILE_PATH) |
|
|
return False |
|
|
|
|
|
print("Database restore completed successfully!") |
|
|
return True |
|
|
|
|
|
except Exception as e: |
|
|
print(f"Restore error: {e}") |
|
|
if os.path.exists(DB_FILE_PATH): |
|
|
os.unlink(DB_FILE_PATH) |
|
|
return False |
|
|
|
|
|
|
|
|
if __name__ == "__main__": |
|
|
success = restore_db() |
|
|
sys.exit(0 if success else 1) |
|
|
|