diff --git "a/app.py" "b/app.py" --- "a/app.py" +++ "b/app.py" @@ -86,17 +86,15 @@ def log_message(message: str, level: str = "info", command: str = None): "command": command, "id": len(migration_state["log"]) } - with migration_lock: migration_state["log"].append(log_entry) migration_state["last_activity"] = time.time() - logger.info(f"[{level.upper()}] {message}") if command: logger.info(f"Command: {command}") -def test_connection(connection_string: str) -> bool: - """Test a PostgreSQL connection string""" +def test_connection_logic(connection_string: str) -> bool: + """Test a PostgreSQL connection string (internal logic)""" try: conn = psycopg2.connect(connection_string) conn.close() @@ -120,25 +118,21 @@ def monitor_dump_size(): if migration_state["dump_file"] and os.path.exists(migration_state["dump_file"]): # Get current file size current_size = get_file_size_mb(migration_state["dump_file"]) - # Calculate growth rate elapsed = time.time() - migration_state["start_time"] if elapsed > 0: growth_rate = current_size / elapsed # MB/sec - # Update progress state with migration_lock: migration_state["dump_file_size"] = current_size migration_state["progress"]["current_size_mb"] = round(current_size, 2) migration_state["progress"]["growth_rate_mb_per_sec"] = round(growth_rate, 2) - # Update size change for UI display size_change = current_size - migration_state["previous_size"] if size_change > 0: migration_state["previous_size"] = current_size except Exception as e: logger.error(f"Error monitoring dump size: {str(e)}") - time.sleep(1) # Update every second def run_dump(source_conn: str, file_path: str, options: dict): @@ -147,28 +141,28 @@ def run_dump(source_conn: str, file_path: str, options: dict): # Clear any existing file if os.path.exists(file_path): os.remove(file_path) - + # Set environment variables for connection env = os.environ.copy() - + # Build pg_dump command format_flag = "-F" + options.get("format", "c") # Default to custom format cmd = ["pg_dump", source_conn, format_flag, "-v", "-f", file_path] - + # Add schema if specified if options.get("schema"): cmd.extend(["-n", options["schema"]]) - + # Add compression level if specified if options.get("compression") and options["compression"] != "default": cmd.extend(["-Z", options["compression"]]) - + log_message(f"Starting database dump to {file_path}", "info", " ".join(cmd)) - + # Start monitoring thread for file size monitor_thread = threading.Thread(target=monitor_dump_size, daemon=True) monitor_thread.start() - + # Start the dump process with migration_lock: migration_state["start_time"] = time.time() @@ -176,67 +170,75 @@ def run_dump(source_conn: str, file_path: str, options: dict): migration_state["operation"] = "dump" migration_state["dump_file"] = file_path migration_state["dump_completed"] = False - + migration_state["previous_size"] = 0 # Reset previous size + process = subprocess.Popen( cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, env=env, - text=True + text=True, + bufsize=1, # Line buffering + universal_newlines=True ) - + with migration_lock: migration_state["process"] = process - + # Process output - for line in process.stderr: - if "Dumping" in line and "table" in line: - table_name = line.split("Dumping")[1].strip().split(" ")[1] + if process.stderr: + for line in iter(process.stderr.readline, ''): + line = line.strip() + if not line: + continue + # Update log with verbose output from pg_dump + log_message(line, "info") + if "dumping contents of table" in line: + try: + table_name = line.split('"')[1] # Extract table name from quotes + with migration_lock: + migration_state["progress"]["current_table"] = table_name + migration_state["progress"]["tables_completed"] += 1 + except IndexError: + logger.warning(f"Could not parse table name from line: {line}") + # Check if process is still running with migration_lock: - migration_state["progress"]["current_table"] = table_name - migration_state["progress"]["tables_completed"] += 1 - - log_message(f"Dumping table: {table_name}", "info") - + if not migration_state["running"]: + break # Stop processing if process was terminated + # Wait for process to complete stdout, stderr = process.communicate() exit_code = process.returncode - - if exit_code == 0: - # Get final file size - final_size = get_file_size_mb(file_path) - - with migration_lock: - migration_state["dump_file_size"] = final_size - migration_state["progress"]["current_size_mb"] = round(final_size, 2) - migration_state["dump_completed"] = True - migration_state["end_time"] = time.time() - migration_state["running"] = False - migration_state["process"] = None - - total_time = migration_state["end_time"] - migration_state["start_time"] - log_message( - f"Database dump completed successfully. Size: {round(final_size, 2)} MB. Time: {round(total_time, 2)} seconds", - "success" - ) - return True - else: - error_message = stderr or "Unknown error during dump" - log_message(f"Database dump failed: {error_message}", "error") - - with migration_lock: + + with migration_lock: + # Ensure running state is updated based on process completion + if migration_state["running"]: # Only update if not stopped manually + if exit_code == 0: + # Get final file size + final_size = get_file_size_mb(file_path) + migration_state["dump_file_size"] = final_size + migration_state["progress"]["current_size_mb"] = round(final_size, 2) + migration_state["dump_completed"] = True + migration_state["end_time"] = time.time() + total_time = migration_state["end_time"] - migration_state["start_time"] + log_message( + f"Database dump completed successfully. Size: {round(final_size, 2)} MB. Time: {round(total_time, 2)} seconds", + "success" + ) + else: + error_message = stderr or stdout or "Unknown error during dump" + log_message(f"Database dump failed: {error_message}", "error") + migration_state["running"] = False migration_state["process"] = None - - return False - + + return exit_code == 0 + except Exception as e: log_message(f"Error during database dump: {str(e)}", "error") - with migration_lock: migration_state["running"] = False migration_state["process"] = None - return False def run_restore(target_conn: str, file_path: str, options: dict): @@ -244,17 +246,17 @@ def run_restore(target_conn: str, file_path: str, options: dict): try: if not os.path.exists(file_path): log_message(f"Dump file not found: {file_path}", "error") + with migration_lock: + migration_state["running"] = False # Ensure state is consistent return False - + # Set environment variables for connection env = os.environ.copy() - + # Run timescaledb_pre_restore() if specified if options.get("timescaledb_pre_restore", True): pre_restore_cmd = ["psql", target_conn, "-c", "SELECT timescaledb_pre_restore();"] - log_message("Running timescaledb_pre_restore()", "info", " ".join(pre_restore_cmd)) - pre_restore_process = subprocess.Popen( pre_restore_cmd, stdout=subprocess.PIPE, @@ -262,72 +264,90 @@ def run_restore(target_conn: str, file_path: str, options: dict): env=env, text=True ) - pre_restore_stdout, pre_restore_stderr = pre_restore_process.communicate() if pre_restore_process.returncode != 0: - log_message(f"Pre-restore failed: {pre_restore_stderr}", "error") + log_message(f"Pre-restore failed: {pre_restore_stderr or pre_restore_stdout}", "error") + with migration_lock: + migration_state["running"] = False # Ensure state is consistent return False - + # Build pg_restore command cmd = ["pg_restore", "-d", target_conn, "-v"] - + # Add no-owner flag if specified if options.get("no_owner", True): cmd.append("--no-owner") - # Add clean flag if specified if options.get("clean", False): cmd.append("--clean") - # Add single transaction flag if specified if options.get("single_transaction", True): cmd.append("--single-transaction") - # Add file path cmd.append(file_path) - + log_message(f"Starting database restore from {file_path}", "info", " ".join(cmd)) - + with migration_lock: migration_state["start_time"] = time.time() migration_state["running"] = True migration_state["operation"] = "restore" migration_state["restore_completed"] = False - + migration_state["progress"]["tables_completed"] = 0 # Reset counter + process = subprocess.Popen( cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, env=env, - text=True + text=True, + bufsize=1, # Line buffering + universal_newlines=True ) - + with migration_lock: migration_state["process"] = process - + # Process output - for line in process.stderr: - if "processing" in line.lower() and "table" in line.lower(): - try: - table_name = line.split("table")[1].strip().split(" ")[0] - with migration_lock: - migration_state["progress"]["current_table"] = table_name - migration_state["progress"]["tables_completed"] += 1 - - log_message(f"Restoring table: {table_name}", "info") - except: - pass - + if process.stderr: + for line in iter(process.stderr.readline, ''): + line = line.strip() + if not line: + continue + # Log verbose output + log_message(line, "info") + # Try to parse table name (pg_restore output format varies) + if "processing" in line.lower() and ("table data" in line.lower() or "table" in line.lower()): + try: + # Attempt to extract table name, might need refinement + parts = line.split() + table_index = -1 + if "table" in parts: table_index = parts.index("table") + 1 + elif "data" in parts: table_index = parts.index("data") + 1 + + if table_index > 0 and table_index < len(parts): + table_name = parts[table_index].strip('."') + with migration_lock: + migration_state["progress"]["current_table"] = table_name + migration_state["progress"]["tables_completed"] += 1 + else: + logger.warning(f"Could not parse table name from restore line: {line}") + except Exception as parse_err: + logger.warning(f"Error parsing restore line '{line}': {parse_err}") + # Check if process is still running + with migration_lock: + if not migration_state["running"]: + break # Stop processing if process was terminated + # Wait for process to complete stdout, stderr = process.communicate() exit_code = process.returncode - - # Run timescaledb_post_restore() if specified + + post_restore_success = True + # Run timescaledb_post_restore() if specified and restore was successful so far if exit_code == 0 and options.get("timescaledb_post_restore", True): post_restore_cmd = ["psql", target_conn, "-c", "SELECT timescaledb_post_restore(); ANALYZE;"] - log_message("Running timescaledb_post_restore() and ANALYZE", "info", " ".join(post_restore_cmd)) - post_restore_process = subprocess.Popen( post_restore_cmd, stdout=subprocess.PIPE, @@ -335,41 +355,37 @@ def run_restore(target_conn: str, file_path: str, options: dict): env=env, text=True ) - post_restore_stdout, post_restore_stderr = post_restore_process.communicate() if post_restore_process.returncode != 0: - log_message(f"Post-restore failed: {post_restore_stderr}", "error") - - if exit_code == 0: - with migration_lock: - migration_state["restore_completed"] = True - migration_state["end_time"] = time.time() - migration_state["running"] = False - migration_state["process"] = None - - total_time = migration_state["end_time"] - migration_state["start_time"] - log_message( - f"Database restore completed successfully. Time: {round(total_time, 2)} seconds", - "success" - ) - return True - else: - error_message = stderr or "Unknown error during restore" - log_message(f"Database restore failed: {error_message}", "error") - - with migration_lock: + log_message(f"Post-restore failed: {post_restore_stderr or post_restore_stdout}", "error") + post_restore_success = False # Mark post-restore as failed + + with migration_lock: + # Ensure running state is updated based on process completion + if migration_state["running"]: # Only update if not stopped manually + if exit_code == 0 and post_restore_success: + migration_state["restore_completed"] = True + migration_state["end_time"] = time.time() + total_time = migration_state["end_time"] - migration_state["start_time"] + log_message( + f"Database restore completed successfully. Time: {round(total_time, 2)} seconds", + "success" + ) + elif exit_code != 0: + error_message = stderr or stdout or "Unknown error during restore" + log_message(f"Database restore failed: {error_message}", "error") + # If post_restore failed, it's already logged. + migration_state["running"] = False migration_state["process"] = None - - return False - + + return exit_code == 0 and post_restore_success + except Exception as e: log_message(f"Error during database restore: {str(e)}", "error") - with migration_lock: migration_state["running"] = False migration_state["process"] = None - return False def stop_current_process(): @@ -377,37 +393,55 @@ def stop_current_process(): with migration_lock: if migration_state["process"] and migration_state["running"]: try: - migration_state["process"].terminate() - migration_state["process"] = None - migration_state["running"] = False - + pid = migration_state["process"].pid + logger.info(f"Terminating process with PID: {pid}") + migration_state["process"].terminate() # Send SIGTERM + try: + # Wait a short time for graceful termination + migration_state["process"].wait(timeout=2) + except subprocess.TimeoutExpired: + logger.warning(f"Process {pid} did not terminate gracefully, sending SIGKILL.") + migration_state["process"].kill() # Send SIGKILL if necessary + migration_state["process"].wait() # Wait for kill + operation = migration_state["operation"] log_message(f"Database {operation} operation stopped by user", "warning") + + # Update state *after* termination attempt + migration_state["process"] = None + migration_state["running"] = False + migration_state["end_time"] = time.time() return True except Exception as e: log_message(f"Error stopping process: {str(e)}", "error") + # Attempt to force state update even if termination fails + migration_state["process"] = None + migration_state["running"] = False return False return False @app.get("/", response_class=HTMLResponse) async def home(request: Request): """Home page with migration UI""" - with open("templates/index.html", "w") as f: - f.write(""" + # In a real app, load from file, but for simplicity, keep it inline. + # Create the file if it doesn't exist (e.g., first run) + if not os.path.exists("templates/index.html"): + with open("templates/index.html", "w") as f: + f.write("Placeholder - HTML will be generated") # Basic placeholder + + # The actual HTML content + html_content = """
After connecting to your databases, proceed to the Dump tab to create a backup or the Restore tab to recover from a backup.
-TimescaleDB Migrator is a tool designed to simplify the process of migrating data between TimescaleDB instances using the PostgreSQL native backup and restore utilities: pg_dump and pg_restore.
The dump operation uses pg_dump to create a backup of your source database. This backup can be in various formats (custom, directory, plain SQL, or tar) and with different compression levels.
The restore operation uses pg_restore to import your backup into the target database. It includes TimescaleDB-specific pre and post-restore functions to ensure data integrity.