diff --git "a/app.py" "b/app.py"
new file mode 100644--- /dev/null
+++ "b/app.py"
@@ -0,0 +1,4201 @@
+from fastapi import FastAPI, Request, BackgroundTasks, HTTPException
+from fastapi.responses import HTMLResponse, FileResponse, JSONResponse
+from fastapi.templating import Jinja2Templates
+from fastapi.staticfiles import StaticFiles
+from fastapi.middleware.cors import CORSMiddleware
+import os
+import subprocess
+import threading
+import time
+import json
+import datetime
+import uuid
+import shutil
+from typing import Dict, Any, Optional, List
+from pathlib import Path
+import psycopg2
+import logging
+
+# Setup logging
+logging.basicConfig(
+ level=logging.INFO,
+ format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
+)
+logger = logging.getLogger("pgmigrator")
+
+# Initialize FastAPI app
+app = FastAPI(title="TimescaleDB Migration Tool")
+
+# Enable CORS
+app.add_middleware(
+ CORSMiddleware,
+ allow_origins=["*"],
+ allow_credentials=True,
+ allow_methods=["*"],
+ allow_headers=["*"],
+)
+
+# Create necessary directories
+os.makedirs("templates", exist_ok=True)
+os.makedirs("dumps", exist_ok=True)
+
+# Setup templates
+templates = Jinja2Templates(directory="templates")
+
+# Create a static files directory for downloads
+static_dir = Path("dumps")
+static_dir.mkdir(exist_ok=True)
+app.mount("/downloads", StaticFiles(directory="dumps"), name="downloads")
+
+# Global state for migration
+migration_state = {
+ "id": str(uuid.uuid4()),
+ "running": False,
+ "operation": None, # "dump" or "restore"
+ "start_time": None,
+ "end_time": None,
+ "dump_file": None,
+ "dump_file_size": 0,
+ "previous_size": 0,
+ "dump_completed": False,
+ "restore_completed": False,
+ "last_activity": time.time(),
+ "log": [],
+ "process": None,
+ "progress": {
+ "current_table": None,
+ "tables_completed": 0,
+ "total_tables": 0,
+ "current_size_mb": 0,
+ "growth_rate_mb_per_sec": 0,
+ "estimated_time_remaining": None,
+ "percent_complete": 0
+ }
+}
+
+# Lock for updating global state
+migration_lock = threading.Lock()
+
+def log_message(message: str, level: str = "info", command: str = None):
+ """Add timestamped log message with level"""
+ timestamp = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
+ log_entry = {
+ "timestamp": timestamp,
+ "message": message,
+ "level": level,
+ "command": command,
+ "id": len(migration_state["log"])
+ }
+
+ with migration_lock:
+ migration_state["log"].append(log_entry)
+ migration_state["last_activity"] = time.time()
+
+ logger.info(f"[{level.upper()}] {message}")
+ if command:
+ logger.info(f"Command: {command}")
+
+def test_connection(connection_string: str) -> bool:
+ """Test a PostgreSQL connection string"""
+ try:
+ conn = psycopg2.connect(connection_string)
+ conn.close()
+ return True
+ except Exception as e:
+ logger.error(f"Connection test failed: {str(e)}")
+ return False
+
+def get_file_size_mb(file_path: str) -> float:
+ """Get file size in megabytes"""
+ try:
+ size_bytes = os.path.getsize(file_path)
+ return size_bytes / (1024 * 1024) # Convert to MB
+ except Exception:
+ return 0
+
+def monitor_dump_size():
+ """Monitor the dump file size and update state"""
+ while migration_state["running"] and migration_state["operation"] == "dump":
+ try:
+ if migration_state["dump_file"] and os.path.exists(migration_state["dump_file"]):
+ # Get current file size
+ current_size = get_file_size_mb(migration_state["dump_file"])
+
+ # Calculate growth rate
+ elapsed = time.time() - migration_state["start_time"]
+ if elapsed > 0:
+ growth_rate = current_size / elapsed # MB/sec
+
+ # Update progress state
+ with migration_lock:
+ migration_state["dump_file_size"] = current_size
+ migration_state["progress"]["current_size_mb"] = round(current_size, 2)
+ migration_state["progress"]["growth_rate_mb_per_sec"] = round(growth_rate, 2)
+
+ # Update size change for UI display
+ size_change = current_size - migration_state["previous_size"]
+ if size_change > 0:
+ migration_state["previous_size"] = current_size
+ except Exception as e:
+ logger.error(f"Error monitoring dump size: {str(e)}")
+
+ time.sleep(1) # Update every second
+
+def run_dump(source_conn: str, file_path: str, options: dict):
+ """Run pg_dump in a background thread"""
+ try:
+ # Clear any existing file
+ if os.path.exists(file_path):
+ os.remove(file_path)
+
+ # Set environment variables for connection
+ env = os.environ.copy()
+
+ # Build pg_dump command
+ format_flag = "-F" + options.get("format", "c") # Default to custom format
+ cmd = ["pg_dump", source_conn, format_flag, "-v", "-f", file_path]
+
+ # Add schema if specified
+ if options.get("schema"):
+ cmd.extend(["-n", options["schema"]])
+
+ # Add compression level if specified
+ if options.get("compression") and options["compression"] != "default":
+ cmd.extend(["-Z", options["compression"]])
+
+ log_message(f"Starting database dump to {file_path}", "info", " ".join(cmd))
+
+ # Start monitoring thread for file size
+ monitor_thread = threading.Thread(target=monitor_dump_size, daemon=True)
+ monitor_thread.start()
+
+ # Start the dump process
+ with migration_lock:
+ migration_state["start_time"] = time.time()
+ migration_state["running"] = True
+ migration_state["operation"] = "dump"
+ migration_state["dump_file"] = file_path
+ migration_state["dump_completed"] = False
+
+ process = subprocess.Popen(
+ cmd,
+ stdout=subprocess.PIPE,
+ stderr=subprocess.PIPE,
+ env=env,
+ text=True
+ )
+
+ with migration_lock:
+ migration_state["process"] = process
+
+ # Process output
+ for line in process.stderr:
+ if "Dumping" in line and "table" in line:
+ table_name = line.split("Dumping")[1].strip().split(" ")[1]
+ with migration_lock:
+ migration_state["progress"]["current_table"] = table_name
+ migration_state["progress"]["tables_completed"] += 1
+
+ log_message(f"Dumping table: {table_name}", "info")
+
+ # Wait for process to complete
+ stdout, stderr = process.communicate()
+ exit_code = process.returncode
+
+ if exit_code == 0:
+ # Get final file size
+ final_size = get_file_size_mb(file_path)
+
+ with migration_lock:
+ migration_state["dump_file_size"] = final_size
+ migration_state["progress"]["current_size_mb"] = round(final_size, 2)
+ migration_state["dump_completed"] = True
+ migration_state["end_time"] = time.time()
+ migration_state["running"] = False
+ migration_state["process"] = None
+
+ total_time = migration_state["end_time"] - migration_state["start_time"]
+ log_message(
+ f"Database dump completed successfully. Size: {round(final_size, 2)} MB. Time: {round(total_time, 2)} seconds",
+ "success"
+ )
+ return True
+ else:
+ error_message = stderr or "Unknown error during dump"
+ log_message(f"Database dump failed: {error_message}", "error")
+
+ with migration_lock:
+ migration_state["running"] = False
+ migration_state["process"] = None
+
+ return False
+
+ except Exception as e:
+ log_message(f"Error during database dump: {str(e)}", "error")
+
+ with migration_lock:
+ migration_state["running"] = False
+ migration_state["process"] = None
+
+ return False
+
+def run_restore(target_conn: str, file_path: str, options: dict):
+ """Run pg_restore in a background thread"""
+ try:
+ if not os.path.exists(file_path):
+ log_message(f"Dump file not found: {file_path}", "error")
+ return False
+
+ # Set environment variables for connection
+ env = os.environ.copy()
+
+ # Run timescaledb_pre_restore() if specified
+ if options.get("timescaledb_pre_restore", True):
+ pre_restore_cmd = ["psql", target_conn, "-c", "SELECT timescaledb_pre_restore();"]
+
+ log_message("Running timescaledb_pre_restore()", "info", " ".join(pre_restore_cmd))
+
+ pre_restore_process = subprocess.Popen(
+ pre_restore_cmd,
+ stdout=subprocess.PIPE,
+ stderr=subprocess.PIPE,
+ env=env,
+ text=True
+ )
+
+ pre_restore_stdout, pre_restore_stderr = pre_restore_process.communicate()
+ if pre_restore_process.returncode != 0:
+ log_message(f"Pre-restore failed: {pre_restore_stderr}", "error")
+ return False
+
+ # Build pg_restore command
+ cmd = ["pg_restore", "-d", target_conn, "-v"]
+
+ # Add no-owner flag if specified
+ if options.get("no_owner", True):
+ cmd.append("--no-owner")
+
+ # Add clean flag if specified
+ if options.get("clean", False):
+ cmd.append("--clean")
+
+ # Add single transaction flag if specified
+ if options.get("single_transaction", True):
+ cmd.append("--single-transaction")
+
+ # Add file path
+ cmd.append(file_path)
+
+ log_message(f"Starting database restore from {file_path}", "info", " ".join(cmd))
+
+ with migration_lock:
+ migration_state["start_time"] = time.time()
+ migration_state["running"] = True
+ migration_state["operation"] = "restore"
+ migration_state["restore_completed"] = False
+
+ process = subprocess.Popen(
+ cmd,
+ stdout=subprocess.PIPE,
+ stderr=subprocess.PIPE,
+ env=env,
+ text=True
+ )
+
+ with migration_lock:
+ migration_state["process"] = process
+
+ # Process output
+ for line in process.stderr:
+ if "processing" in line.lower() and "table" in line.lower():
+ try:
+ table_name = line.split("table")[1].strip().split(" ")[0]
+ with migration_lock:
+ migration_state["progress"]["current_table"] = table_name
+ migration_state["progress"]["tables_completed"] += 1
+
+ log_message(f"Restoring table: {table_name}", "info")
+ except:
+ pass
+
+ # Wait for process to complete
+ stdout, stderr = process.communicate()
+ exit_code = process.returncode
+
+ # Run timescaledb_post_restore() if specified
+ if exit_code == 0 and options.get("timescaledb_post_restore", True):
+ post_restore_cmd = ["psql", target_conn, "-c", "SELECT timescaledb_post_restore(); ANALYZE;"]
+
+ log_message("Running timescaledb_post_restore() and ANALYZE", "info", " ".join(post_restore_cmd))
+
+ post_restore_process = subprocess.Popen(
+ post_restore_cmd,
+ stdout=subprocess.PIPE,
+ stderr=subprocess.PIPE,
+ env=env,
+ text=True
+ )
+
+ post_restore_stdout, post_restore_stderr = post_restore_process.communicate()
+ if post_restore_process.returncode != 0:
+ log_message(f"Post-restore failed: {post_restore_stderr}", "error")
+
+ if exit_code == 0:
+ with migration_lock:
+ migration_state["restore_completed"] = True
+ migration_state["end_time"] = time.time()
+ migration_state["running"] = False
+ migration_state["process"] = None
+
+ total_time = migration_state["end_time"] - migration_state["start_time"]
+ log_message(
+ f"Database restore completed successfully. Time: {round(total_time, 2)} seconds",
+ "success"
+ )
+ return True
+ else:
+ error_message = stderr or "Unknown error during restore"
+ log_message(f"Database restore failed: {error_message}", "error")
+
+ with migration_lock:
+ migration_state["running"] = False
+ migration_state["process"] = None
+
+ return False
+
+ except Exception as e:
+ log_message(f"Error during database restore: {str(e)}", "error")
+
+ with migration_lock:
+ migration_state["running"] = False
+ migration_state["process"] = None
+
+ return False
+
+def stop_current_process():
+ """Stop the current process if running"""
+ with migration_lock:
+ if migration_state["process"] and migration_state["running"]:
+ try:
+ migration_state["process"].terminate()
+ migration_state["process"] = None
+ migration_state["running"] = False
+
+ operation = migration_state["operation"]
+ log_message(f"Database {operation} operation stopped by user", "warning")
+ return True
+ except Exception as e:
+ log_message(f"Error stopping process: {str(e)}", "error")
+ return False
+ return False
+
+@app.get("/", response_class=HTMLResponse)
+async def home(request: Request):
+ """Home page with migration UI"""
+ with open("templates/index.html", "w") as f:
+ f.write("""
+
+
+
+
+ TimescaleDB Migrator
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+ TimescaleDB Migrator
+
+
+
+
+
+
+
+
+
+
+ Connections
+
+
+ Dump
+
+
+ Restore
+
+
+ Logs
+
+
+ About
+
+
+
+
+
+
+
+ Database Connections
+
+
+
+
+
+
+
+
+
+
+
+
+ Source Database
+
+
+
+
+
+
+
+ Not Connected
+
+
+
+ Test Connection
+
+
+
+
+
+
+ Connection Info
+
+
+
+
+
+
+
+
No Connection
+
Test your connection to view database information
+
+
+
+
+
+ Back
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+ Target Database
+
+
+
+
+
+
+
+ Not Connected
+
+
+
+ Test Connection
+
+
+
+
+
+
+ Connection Info
+
+
+
+
+
+
+
+
No Connection
+
Test your connection to view database information
+
+
+
+
+
+ Back
+
+
+
+
+
+
+
+
Next Steps
+
+
+
After connecting to your databases, proceed to the Dump tab to create a backup or the Restore tab to recover from a backup.
+
+
+
+ Go to Dump
+
+
+ Go to Restore
+
+
+
+
+
+
+
+
+
+ Database Dump
+
+
+
+
+
Dump Settings
+
+
+ Format
+
+ Custom (compressed, most flexible)
+ Directory (for largest databases)
+ Plain SQL (less efficient, readable)
+ TAR (older format)
+
+ The output file format used by pg_dump
+
+
+
+ Compression Level
+
+ Default
+ 0 (no compression, fastest)
+ 1 (minimal)
+ 3
+ 5 (moderate, balanced)
+ 7
+ 9 (maximum compression)
+
+ Higher compression saves space but can take longer
+
+
+
+ Schema Filter (Optional)
+
+ Leave empty for all schemas
+
+
+
+ Output Filename
+
+ Appropriate file extension will be added automatically
+
+
+
+
+ Start Dump
+
+
+ Stop
+
+
+
+
+
+
+
Dump Progress
+
+
+
+
+
+
+
+
+
+
+
+
Elapsed Time
+
00:00:00
+
+
+
+
+
+
+
+
+
+
pg_dump
postgres://user:***@hostname:5432/database
-Fc
-v
-f
timescale_backup.dump
+
+
+
+
+
+
+
+ Database Restore
+
+
+
+
+
Restore Settings
+
+
+
+
+
+
+
+ Start Restore
+
+
+ Stop
+
+
+
+
+
+
+
Restore Progress
+
+
+
+
+
+
+
+
+
00:00:00
+
Since start
+
+
+
+
+
+
+
+
pg_restore
-d
postgres://user:***@hostname:5432/database
-v
--no-owner
--single-transaction
timescale_backup.dump
+
+
+
+
+
+
+
+ Migration Logs
+
+
+
+
+
+
+
$
+
Welcome to TimescaleDB Migrator
+
+
+
$
+
Ready to execute commands. Check logs below for details.
+
+
+
+
+
+
+
+
+ Clear Logs
+
+
+ Export Logs
+
+
+
+
+
+
+
+
+ About TimescaleDB Migrator
+
+
+
+
+
What is TimescaleDB Migrator?
+
+
TimescaleDB Migrator is a tool designed to simplify the process of migrating data between TimescaleDB instances using the PostgreSQL native backup and restore utilities: pg_dump and pg_restore.
+
+
Key Features
+
+
+ Easy Database Migration: Migrate your entire TimescaleDB database with just a few clicks
+ Secure Connections: Support for secure connections with password protection
+ Backup Download: Download your database backup for safekeeping
+ Real-time Monitoring: Track the progress of your dump and restore operations
+ TimescaleDB-aware: Handles TimescaleDB-specific migration requirements
+
+
+
How It Works
+
+
+
Dump Operation
+
The dump operation uses pg_dump to create a backup of your source database. This backup can be in various formats (custom, directory, plain SQL, or tar) and with different compression levels.
+
+
Restore Operation
+
The restore operation uses pg_restore to import your backup into the target database. It includes TimescaleDB-specific pre and post-restore functions to ensure data integrity.
+
+
Commands Used
+
+
pg_dump
"postgres://user:password@source-host:5432/source_db"
-Fc
-v
-f
~/timescale_backup.dump
+
+
+
+
psql
"postgres://user:password@target-host:5432/target_db"
-c
"SELECT timescaledb_pre_restore();"
+
+
+
+
pg_restore
-d
"postgres://user:password@target-host:5432/target_db"
-v
--no-owner
~/timescale_backup.dump
+
+
+
+
psql
"postgres://user:password@target-host:5432/target_db"
-c
"SELECT timescaledb_post_restore(); ANALYZE;"
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+ Are you sure you want to proceed with this action?
+
+
+
+
+
+
+
+
+
+
+ """)
+
+ return templates.TemplateResponse("index.html", {"request": request})
+
+@app.post("/test-connection")
+async def test_connection(data: Dict[str, str]):
+ """Test a database connection"""
+ try:
+ connection_string = data.get("connection_string")
+ connection_type = data.get("connection_type", "source")
+
+ if not connection_string:
+ return JSONResponse(
+ status_code=400,
+ content={"success": False, "message": "Connection string is required"}
+ )
+
+ # Test the connection
+ if not test_connection(connection_string):
+ return JSONResponse(
+ content={"success": False, "message": "Failed to connect to database"}
+ )
+
+ # Get database info
+ conn = psycopg2.connect(connection_string)
+
+ try:
+ with conn.cursor() as cur:
+ # Get server info
+ cur.execute("SELECT version()")
+ version = cur.fetchone()[0]
+
+ # Check if TimescaleDB is installed
+ is_timescaledb = False
+ try:
+ cur.execute("SELECT extversion FROM pg_extension WHERE extname = 'timescaledb'")
+ ts_version = cur.fetchone()
+ is_timescaledb = ts_version is not None
+ except:
+ pass
+
+ # Get database name
+ cur.execute("SELECT current_database()")
+ database = cur.fetchone()[0]
+
+ # Extract server info from version string
+ server_match = version.split(" on ")[1].split(",")[0] if " on " in version else "Unknown"
+
+ log_message(f"Successful connection test to {connection_type} database: {database}", "success")
+
+ return JSONResponse(content={
+ "success": True,
+ "version": version,
+ "is_timescaledb": is_timescaledb,
+ "database": database,
+ "server": server_match
+ })
+ finally:
+ conn.close()
+
+ except Exception as e:
+ log_message(f"Connection test failed: {str(e)}", "error")
+ return JSONResponse(
+ content={"success": False, "message": str(e)}
+ )
+
+@app.post("/start-dump")
+async def start_dump(data: Dict[str, Any], background_tasks: BackgroundTasks):
+ """Start a database dump process"""
+ try:
+ source_conn = data.get("source_conn")
+ options = data.get("options", {})
+
+ if not source_conn:
+ return JSONResponse(
+ status_code=400,
+ content={"success": False, "message": "Source connection string is required"}
+ )
+
+ # Stop any running process
+ stop_current_process()
+
+ # Create dump file path
+ filename = options.get("filename", "timescale_backup")
+ format_flag = options.get("format", "c")
+
+ # Determine file extension
+ extension = ".dump"
+ if format_flag == "p":
+ extension = ".sql"
+ elif format_flag == "d":
+ extension = "" # Directory format has no extension
+ elif format_flag == "t":
+ extension = ".tar"
+
+ # Generate file path
+ file_path = os.path.join("dumps", f"{filename}{extension}")
+
+ # Start dump in background
+ background_tasks.add_task(run_dump, source_conn, file_path, options)
+
+ # Create command preview (with redacted password)
+ cmd_preview = f"{source_conn.split(':')[0]}:***@{source_conn.split('@')[1]} -F{format_flag} -v -f {file_path}"
+
+ return JSONResponse(content={
+ "success": True,
+ "message": "Dump process started",
+ "file_path": file_path,
+ "command_preview": cmd_preview
+ })
+
+ except Exception as e:
+ log_message(f"Failed to start dump: {str(e)}", "error")
+ return JSONResponse(
+ content={"success": False, "message": str(e)}
+ )
+
+@app.post("/start-restore")
+async def start_restore(data: Dict[str, Any], background_tasks: BackgroundTasks):
+ """Start a database restore process"""
+ try:
+ target_conn = data.get("target_conn")
+ dump_file = data.get("dump_file")
+ options = data.get("options", {})
+
+ if not target_conn:
+ return JSONResponse(
+ status_code=400,
+ content={"success": False, "message": "Target connection string is required"}
+ )
+
+ if not dump_file:
+ return JSONResponse(
+ status_code=400,
+ content={"success": False, "message": "Dump file is required"}
+ )
+
+ # Stop any running process
+ stop_current_process()
+
+ # Start restore in background
+ background_tasks.add_task(run_restore, target_conn, dump_file, options)
+
+ # Create command preview (with redacted password)
+ cmd_preview = f"-d {target_conn.split(':')[0]}:***@{target_conn.split('@')[1]} -v"
+
+ if options.get("no_owner", True):
+ cmd_preview += " --no-owner"
+
+ if options.get("clean", False):
+ cmd_preview += " --clean"
+
+ if options.get("single_transaction", True):
+ cmd_preview += " --single-transaction"
+
+ cmd_preview += f" {os.path.basename(dump_file)}"
+
+ return JSONResponse(content={
+ "success": True,
+ "message": "Restore process started",
+ "command_preview": cmd_preview
+ })
+
+ except Exception as e:
+ log_message(f"Failed to start restore: {str(e)}", "error")
+ return JSONResponse(
+ content={"success": False, "message": str(e)}
+ )
+
+@app.post("/stop-process")
+async def stop_process():
+ """Stop the current database process"""
+ try:
+ stopped = stop_current_process()
+
+ if stopped:
+ return JSONResponse(content={
+ "success": True,
+ "message": "Process stopped successfully"
+ })
+ else:
+ return JSONResponse(content={
+ "success": False,
+ "message": "No running process to stop"
+ })
+
+ except Exception as e:
+ log_message(f"Failed to stop process: {str(e)}", "error")
+ return JSONResponse(
+ content={"success": False, "message": str(e)}
+ )
+
+@app.get("/status")
+async def get_status():
+ """Get the current migration status"""
+ return migration_state
+
+@app.post("/clear-logs")
+async def clear_logs():
+ """Clear all logs"""
+ with migration_lock:
+ migration_state["log"] = []
+
+ return JSONResponse(content={"success": True})
+
+@app.get("/list-dumps")
+async def list_dumps():
+ """List available dump files"""
+ try:
+ dumps_dir = Path("dumps")
+
+ if not dumps_dir.exists():
+ dumps_dir.mkdir()
+ return JSONResponse(content={"success": True, "dumps": []})
+
+ dump_files = []
+ for f in dumps_dir.glob("*"):
+ if f.is_file():
+ file_size = f.stat().st_size
+ modified_time = datetime.datetime.fromtimestamp(f.stat().st_mtime)
+
+ dump_files.append({
+ "name": f.name,
+ "path": str(f),
+ "size_mb": file_size / (1024 * 1024),
+ "date": modified_time.strftime("%Y-%m-%d %H:%M:%S")
+ })
+
+ # Sort by modified time, newest first
+ dump_files.sort(key=lambda x: x["date"], reverse=True)
+
+ return JSONResponse(content={"success": True, "dumps": dump_files})
+
+ except Exception as e:
+ log_message(f"Failed to list dumps: {str(e)}", "error")
+ return JSONResponse(
+ content={"success": False, "message": str(e)}
+ )
+
+@app.get("/downloads/{file_name}")
+async def download_file(file_name: str):
+ """Download a dump file"""
+ file_path = os.path.join("dumps", file_name)
+
+ if not os.path.exists(file_path):
+ raise HTTPException(status_code=404, detail="File not found")
+
+ return FileResponse(
+ path=file_path,
+ filename=file_name,
+ media_type="application/octet-stream"
+ )
+
+if __name__ == "__main__":
+ import uvicorn
+ uvicorn.run(app, host="0.0.0.0", port=7860)
\ No newline at end of file