#!/usr/bin/env python """ Database backup and restore script. Supports PostgreSQL backup/restore with compression. """ import os import sys import subprocess import argparse from datetime import datetime from pathlib import Path # Configuration DEFAULT_BACKUP_DIR = "backups" DEFAULT_RETENTION_DAYS = 7 def get_database_url(): """Get database URL from environment.""" return os.getenv("DATABASE_URL", "postgresql://postgres:postgres@localhost:5432/demo_project") def get_backup_dir(): """Get or create backup directory.""" backup_dir = Path(os.getenv("BACKUP_DIR", DEFAULT_BACKUP_DIR)) backup_dir.mkdir(exist_ok=True) return backup_dir def create_backup(backup_name: str = None) -> str: """ Create a database backup. Returns the backup file path. """ db_url = get_database_url() backup_dir = get_backup_dir() # Generate backup name if not provided if not backup_name: timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") backup_name = f"backup_{timestamp}.sql" backup_path = backup_dir / backup_name # Extract connection parameters from DATABASE_URL # Format: postgresql://user:password@host:port/dbname if "@" not in db_url: print("Error: Invalid DATABASE_URL format") sys.exit(1) # Build pg_dump command # Use environment variable to avoid exposing password in process list env = os.environ.copy() env["PGPASSWORD"] = db_url.split(":")[-1].split("@")[0].split("//")[-1] # Extract connection parts connection = db_url.replace("postgresql://", "") user_host = connection.split("@") user = user_host[0].split(":")[0] host_port_db = user_host[1].split("/") host_port = host_port_db[0].split(":") host = host_port[0] port = host_port[1] if len(host_port) > 1 else "5432" dbname = host_port_db[1].split("?")[0] cmd = [ "pg_dump", "-h", host, "-p", port, "-U", user, "-F", "c", # Custom format (compressed) "-b", # Include large objects "-v", # Verbose "-f", str(backup_path), dbname, ] print(f"Creating backup: {backup_path}") print(f"Database: {dbname}@{host}:{port}") try: result = subprocess.run( cmd, env=env, check=True, capture_output=True, text=True ) print("Backup created successfully!") # Get file size size = backup_path.stat().st_size print(f"Backup size: {size / (1024*1024):.2f} MB") return str(backup_path) except subprocess.CalledProcessError as e: print(f"Backup failed: {e.stderr}") sys.exit(1) def list_backups() -> list: """List all available backups.""" backup_dir = get_backup_dir() backups = sorted(backup_dir.glob("*.sql"), key=lambda p: p.stat().st_mtime, reverse=True) if not backups: print("No backups found") return [] print("\nAvailable backups:") print("-" * 60) for backup in backups: size = backup.stat().st_size / (1024 * 1024) mtime = datetime.fromtimestamp(backup.stat().st_mtime) print(f"{backup.name:30} {size:>8.2f} MB {mtime.strftime('%Y-%m-%d %H:%M')}") print("-" * 60) return [str(b) for b in backups] def restore_backup(backup_file: str, target_db: str = None) -> None: """ Restore a database backup. """ db_url = get_database_url() backup_path = Path(backup_file) if not backup_path.exists(): print(f"Error: Backup file not found: {backup_file}") sys.exit(1) # Build connection parameters env = os.environ.copy() env["PGPASSWORD"] = db_url.split(":")[-1].split("@")[0].split("//")[-1] connection = db_url.replace("postgresql://", "") user_host = connection.split("@") user = user_host[0].split(":")[0] host_port_db = user_host[1].split("/") host_port = host_port_db[0].split(":") host = host_port[0] port = host_port[1] if len(host_port) > 1 else "5432" dbname = target_db or host_port_db[1].split("?")[0] # Drop existing database and recreate print(f"Dropping existing database: {dbname}") drop_cmd = [ "dropdb", "--if-exists", "-h", host, "-p", port, "-U", user, dbname, ] try: subprocess.run(drop_cmd, env=env, check=True) except subprocess.CalledProcessError as e: print(f"Warning: Could not drop database: {e}") print("Trying to restore into existing database...") print(f"Creating database: {dbname}") create_cmd = [ "createdb", "-h", host, "-p", port, "-U", user, dbname, ] try: subprocess.run(create_cmd, env=env, check=True) except subprocess.CalledProcessError as e: print(f"Error creating database: {e}") sys.exit(1) # Restore backup print(f"Restoring backup: {backup_file}") cmd = [ "pg_restore", "-h", host, "-p", port, "-U", user, "-d", dbname, "-v", # Verbose str(backup_path), ] try: result = subprocess.run(cmd, env=env, check=True, capture_output=True, text=True) print("Restore completed successfully!") except subprocess.CalledProcessError as e: print(f"Restore failed: {e.stderr}") sys.exit(1) def cleanup_old_backups(retention_days: int = DEFAULT_RETENTION_DAYS) -> None: """Remove backups older than retention period.""" backup_dir = get_backup_dir() cutoff = datetime.now().timestamp() - (retention_days * 86400) removed = 0 for backup in backup_dir.glob("*.sql"): if backup.stat().st_mtime < cutoff: backup.unlink() removed += 1 print(f"Removed old backup: {backup.name}") if removed > 0: print(f"Cleaned up {removed} old backup(s)") else: print("No old backups to clean up") def main(): parser = argparse.ArgumentParser(description="Database backup and restore utility") subparsers = parser.add_subparsers(dest="command", help="Available commands") # Create backup backup_parser = subparsers.add_parser("create", help="Create a database backup") backup_parser.add_argument("-n", "--name", help="Backup file name") # List backups subparsers.add_parser("list", help="List all backups") # Restore backup restore_parser = subparsers.add_parser("restore", help="Restore a backup") restore_parser.add_argument("file", help="Backup file to restore") restore_parser.add_argument("-d", "--database", help="Target database name") # Cleanup old backups cleanup_parser = subparsers.add_parser("cleanup", help="Clean up old backups") cleanup_parser.add_argument( "--days", type=int, default=DEFAULT_RETENTION_DAYS, help=f"Retention days (default: {DEFAULT_RETENTION_DAYS})" ) args = parser.parse_args() if args.command == "create": create_backup(args.name) elif args.command == "list": list_backups() elif args.command == "restore": restore_backup(args.file, args.database) elif args.command == "cleanup": cleanup_old_backups(args.days) else: parser.print_help() if __name__ == "__main__": main()