from fastapi import FastAPI, Request, BackgroundTasks, HTTPException
from fastapi.responses import HTMLResponse, FileResponse, JSONResponse
from fastapi.templating import Jinja2Templates
from fastapi.staticfiles import StaticFiles
from fastapi.middleware.cors import CORSMiddleware
import os
import re
import subprocess
import threading
import time
import json
import datetime
import uuid
import shutil
from typing import Dict, Any, Optional, List
from pathlib import Path
import psycopg2
import logging
# Add these imports
import signal
# import os # os is already imported
# import time # time is already imported
# Setup logging
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
)
logger = logging.getLogger("pgmigrator")
# Initialize FastAPI app
app = FastAPI(title="TimescaleDB Migration Tool")
# Enable CORS
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# Create necessary directories
os.makedirs("templates", exist_ok=True)
os.makedirs("dumps", exist_ok=True)
# Setup templates
templates = Jinja2Templates(directory="templates")
# Create a static files directory for downloads
static_dir = Path("dumps")
static_dir.mkdir(exist_ok=True)
app.mount("/downloads", StaticFiles(directory="dumps"), name="downloads")
# Global state for migration
migration_state = {
"id": str(uuid.uuid4()),
"running": False,
"operation": None, # "dump" or "restore"
"start_time": None,
"end_time": None,
"dump_file": None,
"dump_file_size": 0,
"previous_size": 0,
"dump_completed": False,
"restore_completed": False,
"last_activity": time.time(),
"log": [],
"process": None,
"chunk_info_internal": None, # NEW: internal TSDB chunk details (not returned to client)
"progress": {
"current_table": None,
"tables_completed": 0,
"total_tables": 0,
"current_size_mb": 0,
"growth_rate_mb_per_sec": 0,
"estimated_time_remaining": None,
"percent_complete": 0,
"total_expected_bytes": 0, # NEW: expected total bytes from chunk map
"bytes_completed": 0, # NEW: bytes completed (chunk-based)
"chunks_completed": 0, # NEW: chunks completed (count)
"chunks_total": 0, # NEW: total chunks (from chunk map)
"counted_chunk_names": [] # NEW: to prevent double counting (internal)
}
}
# Lock for updating global state
migration_lock = threading.Lock()
def log_message(message: str, level: str = "info", command: str = None):
"""Add timestamped log message with level"""
timestamp = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
log_entry = {
"timestamp": timestamp,
"message": message,
"level": level,
"command": command,
"id": len(migration_state["log"])
}
with migration_lock:
migration_state["log"].append(log_entry)
migration_state["last_activity"] = time.time()
logger.info(f"[{level.upper()}] {message}")
if command:
logger.info(f"Command: {command}")
def test_connection_logic(connection_string: str) -> bool:
"""Test a PostgreSQL connection string (internal logic)"""
try:
conn = psycopg2.connect(connection_string)
conn.close()
return True
except Exception as e:
logger.error(f"Connection test failed: {str(e)}")
return False
def get_file_size_mb(file_path: str) -> float:
"""Get file size in megabytes"""
try:
size_bytes = os.path.getsize(file_path)
return size_bytes / (1024 * 1024) # Convert to MB
except Exception:
return 0
def build_timescaledb_chunk_info(conn):
"""Build a comprehensive map of all TimescaleDB chunks and their compressed counterparts."""
info = {
"generated_at": datetime.datetime.utcnow().isoformat() + 'Z',
"summary": {
"chunks_count": 0,
"hypertables_count": 0,
"total_effective_size_bytes": 0
},
"name_to_entry": {}, # map of schema.table -> entry
"entries": [] # list of chunk relationship entries
}
try:
with conn.cursor() as cur:
cur.execute("""
SELECT
ht.id AS hypertable_id,
ht.schema_name || '.' || ht.table_name AS hypertable,
orig.schema_name || '.' || orig.table_name AS original_chunk,
CASE WHEN comp.id IS NOT NULL THEN comp.schema_name || '.' || comp.table_name END AS compressed_chunk,
pg_total_relation_size(orig.schema_name || '.' || orig.table_name) AS original_size_bytes,
CASE WHEN comp.id IS NOT NULL THEN pg_total_relation_size(comp.schema_name || '.' || comp.table_name) END AS compressed_size_bytes,
(orig.compressed_chunk_id IS NOT NULL) AS is_compressed,
(ht.table_name LIKE '_materialized_hypertable_%') AS is_cagg
FROM _timescaledb_catalog.chunk AS orig
JOIN _timescaledb_catalog.hypertable AS ht ON ht.id = orig.hypertable_id
LEFT JOIN _timescaledb_catalog.chunk AS comp ON orig.compressed_chunk_id = comp.id
""")
rows = cur.fetchall()
hyps = set()
total_effective = 0
for (
hypertable_id, hypertable, original_chunk, compressed_chunk,
original_size, compressed_size, is_compressed, is_cagg
) in rows:
effective_size = compressed_size if compressed_size is not None else original_size
entry = {
"hypertable_id": hypertable_id,
"hypertable": hypertable,
"original_chunk": original_chunk,
"compressed_chunk": compressed_chunk,
"original_size_bytes": int(original_size) if original_size is not None else 0,
"compressed_size_bytes": int(compressed_size) if compressed_size is not None else None,
"effective_size_bytes": int(effective_size) if effective_size is not None else 0,
"compression_status": "COMPRESSED" if is_compressed else "UNCOMPRESSED",
"is_cagg": bool(is_cagg)
}
info["entries"].append(entry)
hyps.add(hypertable)
total_effective += entry["effective_size_bytes"]
# Map both names to same entry for fast lookup
info["name_to_entry"][original_chunk] = entry
if compressed_chunk:
info["name_to_entry"][compressed_chunk] = entry
info["summary"]["chunks_count"] = len(info["entries"])
info["summary"]["hypertables_count"] = len(hyps)
info["summary"]["total_effective_size_bytes"] = total_effective
except Exception as e:
logger.warning(f"Could not build TimescaleDB chunk map: {e}")
return info
def monitor_dump_size():
"""Monitor the dump file size and update state"""
while migration_state["running"] and migration_state["operation"] == "dump":
try:
if migration_state["dump_file"] and os.path.exists(migration_state["dump_file"]):
# Get current file size
current_size = get_file_size_mb(migration_state["dump_file"])
# Calculate growth rate
elapsed = time.time() - migration_state["start_time"]
if elapsed > 0:
growth_rate = current_size / elapsed # MB/sec
# Update progress state
with migration_lock:
migration_state["dump_file_size"] = current_size
migration_state["progress"]["current_size_mb"] = round(current_size, 2)
migration_state["progress"]["growth_rate_mb_per_sec"] = round(growth_rate, 2)
# Update size change for UI display
size_change = current_size - migration_state["previous_size"]
if size_change > 0:
migration_state["previous_size"] = current_size
except Exception as e:
logger.error(f"Error monitoring dump size: {str(e)}")
time.sleep(1) # Update every second
def run_dump(source_conn: str, file_path: str, options: dict):
"""Run pg_dump in a background thread with chunk-based progress tracking."""
try:
if os.path.exists(file_path):
os.remove(file_path)
env = os.environ.copy()
format_flag = "-F" + options.get("format", "c")
cmd = ["pg_dump", source_conn, format_flag, "-v", "-f", file_path]
if options.get("schema"):
cmd.extend(["-n", options["schema"]])
if options.get("compression") and options["compression"] != "default":
cmd.extend(["-Z", options["compression"]])
log_message(f"Starting database dump to {file_path}", "info", " ".join(cmd))
monitor_thread = threading.Thread(target=monitor_dump_size, daemon=True)
monitor_thread.start()
with migration_lock:
migration_state["start_time"] = time.time()
migration_state["running"] = True
migration_state["operation"] = "dump"
migration_state["dump_file"] = file_path
migration_state["dump_completed"] = False
migration_state["previous_size"] = 0
preexec_fn_to_use = os.setsid if hasattr(os, 'setsid') else None
process = subprocess.Popen(
cmd,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
env=env,
text=True,
bufsize=1,
universal_newlines=True,
preexec_fn=preexec_fn_to_use
)
with migration_lock:
migration_state["process"] = process
if process.stderr:
for line in iter(process.stderr.readline, ''):
line = line.strip()
if not line:
continue
log_message(line, "info")
if "dumping contents of table" in line:
qualified = None
try:
quoted = re.findall(r'"([^"]+)"', line)
if len(quoted) >= 2:
qualified = f"{quoted[-2]}.{quoted[-1]}"
else:
m = re.search(r'dumping contents of table\s+([^\s]+)', line, re.IGNORECASE)
if m:
qualified = m.group(1).strip().strip('"')
except Exception as parse_err:
logger.warning(f"Could not parse table name from dump line: {line} ({parse_err})")
if qualified:
with migration_lock:
migration_state["progress"]["current_table"] = qualified
# Chunk-based progress
chunk_info = migration_state.get("chunk_info_internal")
if chunk_info:
entry = chunk_info["name_to_entry"].get(qualified)
if entry:
counted = set(migration_state["progress"].get("counted_chunk_names", []))
names_to_mark = {entry["original_chunk"]}
if entry["compressed_chunk"]:
names_to_mark.add(entry["compressed_chunk"])
# only count once
if not (counted & names_to_mark):
migration_state["progress"]["bytes_completed"] += entry["effective_size_bytes"]
migration_state["progress"]["chunks_completed"] += 1
counted |= names_to_mark
migration_state["progress"]["counted_chunk_names"] = list(counted)
total = migration_state["progress"].get("total_expected_bytes", 0) or 0
if total > 0:
migration_state["progress"]["percent_complete"] = min(
99,
int((migration_state["progress"]["bytes_completed"] / total) * 100)
)
elif migration_state["progress"]["chunks_total"]:
migration_state["progress"]["percent_complete"] = min(
99,
int(migration_state["progress"]["chunks_completed"] * 100 / migration_state["progress"]["chunks_total"])
)
elapsed = time.time() - (migration_state.get("start_time") or time.time())
if elapsed > 5 and migration_state["progress"]["bytes_completed"] > 0:
rate = migration_state["progress"]["bytes_completed"] / elapsed
remaining = total - migration_state["progress"]["bytes_completed"]
eta = int(remaining / rate) if rate > 0 and total > 0 else None
migration_state["progress"]["estimated_time_remaining"] = eta
# Maintain table count as well (legacy)
migration_state["progress"]["tables_completed"] += 1
with migration_lock:
if not migration_state["running"]:
break
stdout, stderr = process.communicate()
exit_code = process.returncode
with migration_lock:
if migration_state["running"]:
if exit_code == 0:
final_size = get_file_size_mb(file_path)
migration_state["dump_file_size"] = final_size
migration_state["progress"]["current_size_mb"] = round(final_size, 2)
migration_state["dump_completed"] = True
migration_state["end_time"] = time.time()
migration_state["progress"]["percent_complete"] = 100
migration_state["progress"]["estimated_time_remaining"] = 0
total_time = migration_state["end_time"] - migration_state["start_time"]
log_message(
f"Database dump completed successfully. Size: {round(final_size, 2)} MB. Time: {round(total_time, 2)} seconds",
"success"
)
else:
error_message = stderr or stdout or "Unknown error during dump"
log_message(f"Database dump failed: {error_message}", "error")
migration_state["running"] = False
migration_state["process"] = None
return exit_code == 0
except Exception as e:
log_message(f"Error during database dump: {str(e)}", "error")
with migration_lock:
migration_state["running"] = False
migration_state["process"] = None
return False
def run_restore(target_conn: str, file_path: str, options: dict):
"""Run pg_restore in a background thread with chunk-based progress tracking."""
try:
if not os.path.exists(file_path):
log_message(f"Dump file not found: {file_path}", "error")
with migration_lock:
migration_state["running"] = False
return False
env = os.environ.copy()
if options.get("timescaledb_pre_restore", True):
pre_restore_cmd = ["psql", target_conn, "-c", "SELECT timescaledb_pre_restore();"]
log_message("Running timescaledb_pre_restore()", "info", " ".join(pre_restore_cmd))
pre_restore_process = subprocess.Popen(
pre_restore_cmd,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
env=env,
text=True
)
pre_restore_stdout, pre_restore_stderr = pre_restore_process.communicate()
if pre_restore_process.returncode != 0:
log_message(f"Pre-restore failed: {pre_restore_stderr or pre_restore_stdout}", "error")
with migration_lock:
migration_state["running"] = False
return False
cmd = ["pg_restore", "-d", target_conn, "-v"]
if options.get("no_owner", True):
cmd.append("--no-owner")
if options.get("clean", False):
cmd.append("--clean")
# Default OFF now:
if options.get("single_transaction", False):
cmd.append("--single-transaction")
cmd.append(file_path)
log_message(f"Starting database restore from {file_path}", "info", " ".join(cmd))
with migration_lock:
migration_state["start_time"] = time.time()
migration_state["running"] = True
migration_state["operation"] = "restore"
migration_state["restore_completed"] = False
migration_state["progress"]["tables_completed"] = 0
preexec_fn_to_use = os.setsid if hasattr(os, 'setsid') else None
process = subprocess.Popen(
cmd,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
env=env,
text=True,
bufsize=1,
universal_newlines=True,
preexec_fn=preexec_fn_to_use
)
with migration_lock:
migration_state["process"] = process
if process.stderr:
for line in iter(process.stderr.readline, ''):
line = line.strip()
if not line:
continue
log_message(line, "info")
if re.search(r'(processing|restoring)\s+data\s+for\s+table', line, re.IGNORECASE):
qualified = None
try:
quoted = re.findall(r'"([^"]+)"', line)
if len(quoted) >= 2:
qualified = f"{quoted[-2]}.{quoted[-1]}"
else:
m = re.search(r'(processing|restoring)\s+data\s+for\s+table\s+([^\s]+)', line, re.IGNORECASE)
if m:
qualified = m.group(2).strip().strip('"')
except Exception as parse_err:
logger.warning(f"Error parsing restore line '{line}': {parse_err}")
if qualified:
with migration_lock:
migration_state["progress"]["current_table"] = qualified
chunk_info = migration_state.get("chunk_info_internal")
if chunk_info:
entry = chunk_info["name_to_entry"].get(qualified)
if entry:
counted = set(migration_state["progress"].get("counted_chunk_names", []))
names_to_mark = {entry["original_chunk"]}
if entry["compressed_chunk"]:
names_to_mark.add(entry["compressed_chunk"])
if not (counted & names_to_mark):
migration_state["progress"]["bytes_completed"] += entry["effective_size_bytes"]
migration_state["progress"]["chunks_completed"] += 1
counted |= names_to_mark
migration_state["progress"]["counted_chunk_names"] = list(counted)
total = migration_state["progress"].get("total_expected_bytes", 0) or 0
if total > 0:
migration_state["progress"]["percent_complete"] = min(
99,
int((migration_state["progress"]["bytes_completed"] / total) * 100)
)
elif migration_state["progress"]["chunks_total"]:
migration_state["progress"]["percent_complete"] = min(
99,
int(migration_state["progress"]["chunks_completed"] * 100 / migration_state["progress"]["chunks_total"])
)
elapsed = time.time() - (migration_state.get("start_time") or time.time())
if elapsed > 5 and migration_state["progress"]["bytes_completed"] > 0:
rate = migration_state["progress"]["bytes_completed"] / elapsed
remaining = total - migration_state["progress"]["bytes_completed"]
eta = int(remaining / rate) if rate > 0 and total > 0 else None
migration_state["progress"]["estimated_time_remaining"] = eta
# Maintain legacy counter
migration_state["progress"]["tables_completed"] += 1
with migration_lock:
if not migration_state["running"]:
break
stdout, stderr = process.communicate()
exit_code = process.returncode
post_restore_success = True
if exit_code == 0 and options.get("timescaledb_post_restore", True):
post_restore_cmd = ["psql", target_conn, "-c", "SELECT timescaledb_post_restore(); ANALYZE;"]
log_message("Running timescaledb_post_restore() and ANALYZE", "info", " ".join(post_restore_cmd))
post_restore_process = subprocess.Popen(
post_restore_cmd,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
env=env,
text=True
)
post_restore_stdout, post_restore_stderr = post_restore_process.communicate()
if post_restore_process.returncode != 0:
log_message(f"Post-restore failed: {post_restore_stderr or post_restore_stdout}", "error")
post_restore_success = False
with migration_lock:
if migration_state["running"]:
if exit_code == 0 and post_restore_success:
migration_state["restore_completed"] = True
migration_state["end_time"] = time.time()
migration_state["progress"]["percent_complete"] = 100
migration_state["progress"]["estimated_time_remaining"] = 0
total_time = migration_state["end_time"] - migration_state["start_time"]
log_message(
f"Database restore completed successfully. Time: {round(total_time, 2)} seconds",
"success"
)
elif exit_code != 0:
error_message = stderr or stdout or "Unknown error during restore"
log_message(f"Database restore failed: {error_message}", "error")
migration_state["running"] = False
migration_state["process"] = None
return exit_code == 0 and post_restore_success
except Exception as e:
log_message(f"Error during database restore: {str(e)}", "error")
with migration_lock:
migration_state["running"] = False
migration_state["process"] = None
return False
# Replace the old stop_current_process with the new one
def stop_current_process():
"""Stop the current process with improved forceful termination"""
with migration_lock:
if migration_state["process"] and migration_state["running"]:
try:
process = migration_state["process"]
pid = process.pid
operation = migration_state["operation"]
log_message(f"Attempting to stop {operation} process (PID: {pid})...", "warning")
# Check if process is already terminated before trying to stop
if process.poll() is not None:
log_message(f"{operation.capitalize()} process (PID: {pid}) already terminated.", "info")
migration_state["process"] = None
migration_state["running"] = False
return True
# First try graceful termination (SIGTERM)
process.terminate()
# Wait up to 3 seconds for graceful termination
for _ in range(30): # 3 seconds with 0.1s checks
if process.poll() is not None: # Process has terminated
log_message(f"{operation.capitalize()} process (PID: {pid}) terminated gracefully (SIGTERM)", "warning")
break
time.sleep(0.1)
else: # Loop finished without break, process still running
# If still running, force kill with SIGKILL
if process.poll() is None:
log_message(f"Process (PID: {pid}) not responding to graceful termination, forcing kill (SIGKILL)...", "warning")
# Try to kill process group (more thorough) - Unix only
killed_pg = False
if hasattr(os, 'killpg') and hasattr(os, 'getpgid'):
try:
# On Unix systems, negative PID means kill process group
os.killpg(os.getpgid(pid), signal.SIGKILL)
killed_pg = True
log_message(f"Sent SIGKILL to process group of PID {pid}", "warning")
except ProcessLookupError:
log_message(f"Process group for PID {pid} not found (already terminated?).", "info")
# Process likely died between poll and killpg, proceed as if killed
killed_pg = True # Treat as success for logic below
except Exception as kill_err:
log_message(f"Error killing process group for PID {pid}: {kill_err}. Falling back to direct kill.", "error")
# Fallback to direct kill if process group kill fails
process.kill()
log_message(f"Sent SIGKILL directly to PID {pid}", "warning")
else: # Not on Unix or functions unavailable
process.kill()
log_message(f"Sent SIGKILL directly to PID {pid} (killpg not available)", "warning")
# Wait a bit for kill to take effect
time.sleep(0.5)
process.poll() # Update process status after kill attempt
# Final check
if process.poll() is None:
log_message(f"Warning: Process (PID: {pid}) may not have terminated successfully after SIGKILL", "error")
# Even if termination is uncertain, update state to reflect stop attempt
migration_state["process"] = None
migration_state["running"] = False
migration_state["end_time"] = time.time()
return False # Indicate potential failure
else:
log_message(f"Database {operation} operation (PID: {pid}) stopped", "warning")
migration_state["process"] = None
migration_state["running"] = False
migration_state["end_time"] = time.time()
return True
except ProcessLookupError:
# This can happen if the process terminated between the initial check and trying to kill it
log_message(f"Process (PID: {pid}) already terminated before stop action completed.", "info")
migration_state["process"] = None
migration_state["running"] = False
migration_state["end_time"] = time.time()
return True
except Exception as e:
log_message(f"Error stopping process: {str(e)}", "error")
# Force state update even on error
migration_state["process"] = None
migration_state["running"] = False
migration_state["end_time"] = time.time()
return False
else:
# No process was running or associated with the state
log_message("Stop command received, but no process found in current state.", "info")
# Ensure state reflects not running if it wasn't already
if migration_state["running"]:
migration_state["running"] = False
migration_state["process"] = None
return False # Indicate no action was needed/taken on a process
@app.get("/", response_class=HTMLResponse)
async def home(request: Request):
"""Home page with migration UI"""
# In a real app, load from file, but for simplicity, keep it inline.
# Create the file if it doesn't exist (e.g., first run)
if not os.path.exists("templates/index.html"):
with open("templates/index.html", "w") as f:
f.write("Placeholder - HTML will be generated") # Basic placeholder
# The actual HTML content (NOTE: Frontend changes mentioned in prompt are NOT applied here, only backend)
html_content = """
TimescaleDB Migrator
TimescaleDB Migrator
Idle
Connections
Dump
Restore
Logs
About
Database Connections
Source Database
Not Connected
No Connection
Test your connection to view database information
Target Database
Not Connected
No Connection
Test your connection to view database information
Next Steps
After connecting to your databases, proceed to the Dump tab to create a backup or the Restore tab to recover from a backup.
Database Dump
Dump Settings
The output file format used by pg_dump
Higher compression saves space but can take longer
Leave empty for all schemas
Appropriate file extension will be added automatically
Ready to execute commands. Check logs below for details.
Activity Log
About TimescaleDB Migrator
What is TimescaleDB Migrator?
TimescaleDB Migrator is a tool designed to simplify the process of migrating data between TimescaleDB instances using the PostgreSQL native backup and restore utilities: pg_dump and pg_restore.
Key Features
Easy Database Migration: Migrate your entire TimescaleDB database with just a few clicks
Secure Connections: Support for secure connections with password protection
Backup Download: Download your database backup for safekeeping
Real-time Monitoring: Track the progress of your dump and restore operations
The dump operation uses pg_dump to create a backup of your source database. This backup can be in various formats (custom, directory, plain SQL, or tar) and with different compression levels.
Restore Operation
The restore operation uses pg_restore to import your backup into the target database. It includes TimescaleDB-specific pre and post-restore functions to ensure data integrity.