arcticaurora commited on
Commit
f5b7cfb
·
verified ·
1 Parent(s): 1ff7dc1

Update app.py

Browse files
Files changed (1) hide show
  1. app.py +383 -169
app.py CHANGED
@@ -4,6 +4,7 @@ from fastapi.templating import Jinja2Templates
4
  from fastapi.staticfiles import StaticFiles
5
  from fastapi.middleware.cors import CORSMiddleware
6
  import os
 
7
  import subprocess
8
  import threading
9
  import time
@@ -66,6 +67,7 @@ migration_state = {
66
  "last_activity": time.time(),
67
  "log": [],
68
  "process": None,
 
69
  "progress": {
70
  "current_table": None,
71
  "tables_completed": 0,
@@ -73,10 +75,16 @@ migration_state = {
73
  "current_size_mb": 0,
74
  "growth_rate_mb_per_sec": 0,
75
  "estimated_time_remaining": None,
76
- "percent_complete": 0
 
 
 
 
 
77
  }
78
  }
79
 
 
80
  # Lock for updating global state
81
  migration_lock = threading.Lock()
82
 
@@ -115,6 +123,69 @@ def get_file_size_mb(file_path: str) -> float:
115
  except Exception:
116
  return 0
117
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
118
  def monitor_dump_size():
119
  """Monitor the dump file size and update state"""
120
  while migration_state["running"] and migration_state["operation"] == "dump":
@@ -140,48 +211,35 @@ def monitor_dump_size():
140
  time.sleep(1) # Update every second
141
 
142
  def run_dump(source_conn: str, file_path: str, options: dict):
143
- """Run pg_dump in a background thread"""
144
  try:
145
- # Clear any existing file
146
  if os.path.exists(file_path):
147
  os.remove(file_path)
148
 
149
- # Set environment variables for connection
150
  env = os.environ.copy()
151
-
152
- # Build pg_dump command
153
- format_flag = "-F" + options.get("format", "c") # Default to custom format
154
  cmd = ["pg_dump", source_conn, format_flag, "-v", "-f", file_path]
155
 
156
- # Add schema if specified
157
  if options.get("schema"):
158
  cmd.extend(["-n", options["schema"]])
159
 
160
- # Add compression level if specified
161
  if options.get("compression") and options["compression"] != "default":
162
  cmd.extend(["-Z", options["compression"]])
163
 
164
  log_message(f"Starting database dump to {file_path}", "info", " ".join(cmd))
165
 
166
- # Start monitoring thread for file size
167
  monitor_thread = threading.Thread(target=monitor_dump_size, daemon=True)
168
  monitor_thread.start()
169
 
170
- # Start the dump process
171
  with migration_lock:
172
  migration_state["start_time"] = time.time()
173
  migration_state["running"] = True
174
  migration_state["operation"] = "dump"
175
  migration_state["dump_file"] = file_path
176
  migration_state["dump_completed"] = False
177
- migration_state["previous_size"] = 0 # Reset previous size
178
 
179
- # Use preexec_fn=os.setsid to create a new process group
180
- # This is necessary for the killpg logic in stop_current_process
181
- # Note: preexec_fn is Unix-specific. This won't work directly on Windows.
182
- preexec_fn_to_use = None
183
- if hasattr(os, 'setsid'):
184
- preexec_fn_to_use = os.setsid
185
 
186
  process = subprocess.Popen(
187
  cmd,
@@ -189,49 +247,89 @@ def run_dump(source_conn: str, file_path: str, options: dict):
189
  stderr=subprocess.PIPE,
190
  env=env,
191
  text=True,
192
- bufsize=1, # Line buffering
193
  universal_newlines=True,
194
- preexec_fn=preexec_fn_to_use # Create a new process group
195
  )
196
 
197
  with migration_lock:
198
  migration_state["process"] = process
199
 
200
- # Process output
201
  if process.stderr:
202
  for line in iter(process.stderr.readline, ''):
203
  line = line.strip()
204
  if not line:
205
  continue
206
- # Update log with verbose output from pg_dump
207
  log_message(line, "info")
 
208
  if "dumping contents of table" in line:
 
209
  try:
210
- table_name = line.split('"')[1] # Extract table name from quotes
 
 
 
 
 
 
 
 
 
 
211
  with migration_lock:
212
- migration_state["progress"]["current_table"] = table_name
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
213
  migration_state["progress"]["tables_completed"] += 1
214
- except IndexError:
215
- logger.warning(f"Could not parse table name from line: {line}")
216
- # Check if process is still running
217
  with migration_lock:
218
  if not migration_state["running"]:
219
- break # Stop processing if process was terminated
220
 
221
- # Wait for process to complete
222
  stdout, stderr = process.communicate()
223
  exit_code = process.returncode
224
 
225
  with migration_lock:
226
- # Ensure running state is updated based on process completion
227
- if migration_state["running"]: # Only update if not stopped manually
228
  if exit_code == 0:
229
- # Get final file size
230
  final_size = get_file_size_mb(file_path)
231
  migration_state["dump_file_size"] = final_size
232
  migration_state["progress"]["current_size_mb"] = round(final_size, 2)
233
  migration_state["dump_completed"] = True
234
  migration_state["end_time"] = time.time()
 
 
235
  total_time = migration_state["end_time"] - migration_state["start_time"]
236
  log_message(
237
  f"Database dump completed successfully. Size: {round(final_size, 2)} MB. Time: {round(total_time, 2)} seconds",
@@ -253,19 +351,18 @@ def run_dump(source_conn: str, file_path: str, options: dict):
253
  migration_state["process"] = None
254
  return False
255
 
 
256
  def run_restore(target_conn: str, file_path: str, options: dict):
257
- """Run pg_restore in a background thread"""
258
  try:
259
  if not os.path.exists(file_path):
260
  log_message(f"Dump file not found: {file_path}", "error")
261
  with migration_lock:
262
- migration_state["running"] = False # Ensure state is consistent
263
  return False
264
 
265
- # Set environment variables for connection
266
  env = os.environ.copy()
267
 
268
- # Run timescaledb_pre_restore() if specified
269
  if options.get("timescaledb_pre_restore", True):
270
  pre_restore_cmd = ["psql", target_conn, "-c", "SELECT timescaledb_pre_restore();"]
271
  log_message("Running timescaledb_pre_restore()", "info", " ".join(pre_restore_cmd))
@@ -280,22 +377,17 @@ def run_restore(target_conn: str, file_path: str, options: dict):
280
  if pre_restore_process.returncode != 0:
281
  log_message(f"Pre-restore failed: {pre_restore_stderr or pre_restore_stdout}", "error")
282
  with migration_lock:
283
- migration_state["running"] = False # Ensure state is consistent
284
  return False
285
 
286
- # Build pg_restore command
287
  cmd = ["pg_restore", "-d", target_conn, "-v"]
288
-
289
- # Add no-owner flag if specified
290
  if options.get("no_owner", True):
291
  cmd.append("--no-owner")
292
- # Add clean flag if specified
293
  if options.get("clean", False):
294
  cmd.append("--clean")
295
- # Add single transaction flag if specified
296
- if options.get("single_transaction", True):
297
  cmd.append("--single-transaction")
298
- # Add file path
299
  cmd.append(file_path)
300
 
301
  log_message(f"Starting database restore from {file_path}", "info", " ".join(cmd))
@@ -305,12 +397,9 @@ def run_restore(target_conn: str, file_path: str, options: dict):
305
  migration_state["running"] = True
306
  migration_state["operation"] = "restore"
307
  migration_state["restore_completed"] = False
308
- migration_state["progress"]["tables_completed"] = 0 # Reset counter
309
 
310
- # Use preexec_fn=os.setsid to create a new process group
311
- preexec_fn_to_use = None
312
- if hasattr(os, 'setsid'):
313
- preexec_fn_to_use = os.setsid
314
 
315
  process = subprocess.Popen(
316
  cmd,
@@ -318,51 +407,78 @@ def run_restore(target_conn: str, file_path: str, options: dict):
318
  stderr=subprocess.PIPE,
319
  env=env,
320
  text=True,
321
- bufsize=1, # Line buffering
322
  universal_newlines=True,
323
- preexec_fn=preexec_fn_to_use # Create a new process group
324
  )
325
 
326
  with migration_lock:
327
  migration_state["process"] = process
328
 
329
- # Process output
330
  if process.stderr:
331
  for line in iter(process.stderr.readline, ''):
332
  line = line.strip()
333
  if not line:
334
  continue
335
- # Log verbose output
336
  log_message(line, "info")
337
- # Try to parse table name (pg_restore output format varies)
338
- if "processing" in line.lower() and ("table data" in line.lower() or "table" in line.lower()):
 
339
  try:
340
- # Attempt to extract table name, might need refinement
341
- parts = line.split()
342
- table_index = -1
343
- if "table" in parts: table_index = parts.index("table") + 1
344
- elif "data" in parts: table_index = parts.index("data") + 1
345
-
346
- if table_index > 0 and table_index < len(parts):
347
- table_name = parts[table_index].strip('."')
348
- with migration_lock:
349
- migration_state["progress"]["current_table"] = table_name
350
- migration_state["progress"]["tables_completed"] += 1
351
  else:
352
- logger.warning(f"Could not parse table name from restore line: {line}")
 
 
353
  except Exception as parse_err:
354
  logger.warning(f"Error parsing restore line '{line}': {parse_err}")
355
- # Check if process is still running
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
356
  with migration_lock:
357
  if not migration_state["running"]:
358
- break # Stop processing if process was terminated
359
 
360
- # Wait for process to complete
361
  stdout, stderr = process.communicate()
362
  exit_code = process.returncode
363
 
364
  post_restore_success = True
365
- # Run timescaledb_post_restore() if specified and restore was successful so far
366
  if exit_code == 0 and options.get("timescaledb_post_restore", True):
367
  post_restore_cmd = ["psql", target_conn, "-c", "SELECT timescaledb_post_restore(); ANALYZE;"]
368
  log_message("Running timescaledb_post_restore() and ANALYZE", "info", " ".join(post_restore_cmd))
@@ -376,14 +492,15 @@ def run_restore(target_conn: str, file_path: str, options: dict):
376
  post_restore_stdout, post_restore_stderr = post_restore_process.communicate()
377
  if post_restore_process.returncode != 0:
378
  log_message(f"Post-restore failed: {post_restore_stderr or post_restore_stdout}", "error")
379
- post_restore_success = False # Mark post-restore as failed
380
 
381
  with migration_lock:
382
- # Ensure running state is updated based on process completion
383
- if migration_state["running"]: # Only update if not stopped manually
384
  if exit_code == 0 and post_restore_success:
385
  migration_state["restore_completed"] = True
386
  migration_state["end_time"] = time.time()
 
 
387
  total_time = migration_state["end_time"] - migration_state["start_time"]
388
  log_message(
389
  f"Database restore completed successfully. Time: {round(total_time, 2)} seconds",
@@ -392,7 +509,6 @@ def run_restore(target_conn: str, file_path: str, options: dict):
392
  elif exit_code != 0:
393
  error_message = stderr or stdout or "Unknown error during restore"
394
  log_message(f"Database restore failed: {error_message}", "error")
395
- # If post_restore failed, it's already logged.
396
 
397
  migration_state["running"] = False
398
  migration_state["process"] = None
@@ -406,6 +522,7 @@ def run_restore(target_conn: str, file_path: str, options: dict):
406
  migration_state["process"] = None
407
  return False
408
 
 
409
  # Replace the old stop_current_process with the new one
410
  def stop_current_process():
411
  """Stop the current process with improved forceful termination"""
@@ -2305,7 +2422,7 @@ async def home(request: Request):
2305
  <label class="form-label" for="clean">Clean (drop) database objects before recreating (--clean)</label>
2306
  </div>
2307
  <div class="form-check mb-2">
2308
- <input class="form-check-input" type="checkbox" id="single-transaction" checked>
2309
  <label class="form-label" for="single-transaction">Restore as a single transaction (--single-transaction)</label>
2310
  </div>
2311
  </div>
@@ -2396,12 +2513,18 @@ async def home(request: Request):
2396
  <div class="logs-container mt-4">
2397
  <div class="logs-header">
2398
  <div class="logs-title">Activity Log</div>
2399
- <div class="logs-filters">
2400
- <button class="log-filter active" data-level="all">All</button>
2401
- <button class="log-filter" data-level="info">Info</button>
2402
- <button class="log-filter" data-level="success">Success</button>
2403
- <button class="log-filter" data-level="warning">Warning</button>
2404
- <button class="log-filter" data-level="error">Error</button>
 
 
 
 
 
 
2405
  </div>
2406
  </div>
2407
  <div class="logs-body" id="logs-output">
@@ -2547,6 +2670,24 @@ async def home(request: Request):
2547
  const cancelConfirmBtn = document.getElementById('cancel-confirm-btn');
2548
  const confirmActionBtn = document.getElementById('confirm-action-btn');
2549
  const confirmModalBody = document.getElementById('confirm-modal-body');
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
2550
  // State variables
2551
  let sizeChart = null;
2552
  let updateInterval = null;
@@ -2642,8 +2783,11 @@ async def home(request: Request):
2642
  tab.classList.add('active');
2643
  const tabId = tab.getAttribute('data-tab');
2644
  document.getElementById(`${tabId}-tab`).classList.add('active');
 
 
2645
  });
2646
  });
 
2647
  // Log filter functionality
2648
  logFilters.forEach(filter => {
2649
  filter.addEventListener('click', () => {
@@ -3390,7 +3534,7 @@ async def home(request: Request):
3390
 
3391
  terminalOutput.appendChild(line);
3392
  // Auto-scroll
3393
- terminalOutput.scrollTop = terminalOutput.scrollHeight;
3394
  }
3395
  // Add log entry
3396
  function addLogEntry(log) {
@@ -3416,7 +3560,7 @@ async def home(request: Request):
3416
  logEntry.style.display = 'none';
3417
  }
3418
  // Scroll to bottom
3419
- logsOutput.scrollTop = logsOutput.scrollHeight;
3420
 
3421
  // Also add to terminal, map log level to terminal type
3422
  let terminalType = 'output';
@@ -3695,6 +3839,41 @@ async def home(request: Request):
3695
  await updateStatus(); // This also updates migration_state
3696
  // Add initial log check
3697
  await checkForNewLogs();
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
3698
 
3699
  // If a process was running when the page loaded, sync UI and start updates
3700
  if (migration_state && migration_state.running) {
@@ -3715,6 +3894,7 @@ async def home(request: Request):
3715
  startDumpBtn.disabled = true;
3716
  }
3717
  }
 
3718
  }
3719
  // Initialize app
3720
  initialize();
@@ -3769,82 +3949,91 @@ async def home(request: Request):
3769
 
3770
  @app.post("/test-connection")
3771
  async def test_connection_endpoint(data: Dict[str, str]):
3772
- """Test a database connection and get basic info"""
3773
  try:
3774
  connection_string = data.get("connection_string")
3775
- connection_type = data.get("connection_type", "source") # Added type
3776
  if not connection_string:
3777
  return JSONResponse(
3778
  status_code=400,
3779
  content={"success": False, "message": "Connection string is required"}
3780
  )
3781
 
3782
- # Test the connection using internal logic
3783
  if not test_connection_logic(connection_string):
3784
- # Error is logged within test_connection_logic
3785
  return JSONResponse(
3786
  content={"success": False, "message": "Failed to connect to database"}
3787
  )
3788
 
3789
- # If connection successful, get database info
3790
  conn = psycopg2.connect(connection_string)
3791
  try:
 
3792
  with conn.cursor() as cur:
3793
- # Get server info
3794
  cur.execute("SELECT version()")
3795
  version_result = cur.fetchone()
3796
  version = version_result[0] if version_result else "Unknown"
3797
 
3798
- # Check if TimescaleDB is installed and get version
3799
  is_timescaledb = False
3800
  ts_version = None
3801
  try:
3802
- # Use EXISTS for better performance and error handling if extension not present
3803
  cur.execute("SELECT EXISTS (SELECT 1 FROM pg_extension WHERE extname = 'timescaledb');")
3804
  if cur.fetchone()[0]:
3805
- cur.execute("SELECT extversion FROM pg_extension WHERE extname = 'timescaledb'")
3806
- ts_version_result = cur.fetchone()
3807
- is_timescaledb = ts_version_result is not None
3808
- ts_version = ts_version_result[0] if ts_version_result else None
3809
  except psycopg2.Error as ts_err:
3810
- logger.warning(f"Could not check TimescaleDB extension: {ts_err}")
3811
- # Continue without TimescaleDB info if query fails
3812
 
3813
- # Get database name
3814
  cur.execute("SELECT current_database()")
3815
  db_result = cur.fetchone()
3816
  database = db_result[0] if db_result else "Unknown"
3817
 
3818
- # Extract server host/IP (best effort parsing)
3819
  server_match = "Unknown"
3820
  try:
3821
- # Extract from connection string if possible (more reliable)
3822
  host_part = connection_string.split('@')[-1].split('/')[0].split(':')[0]
3823
  if host_part:
3824
  server_match = host_part
3825
- # Fallback to parsing version string if needed
3826
  elif " on " in version:
3827
- server_match = version.split(" on ")[-1].split(",")[0]
3828
  except Exception:
3829
  logger.warning("Could not parse server host from connection string or version.")
3830
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
3831
 
3832
  log_message(f"Successful connection test to {connection_type} database: {database} on {server_match}", "success")
3833
  return JSONResponse(content={
3834
  "success": True,
3835
  "version": version,
3836
  "is_timescaledb": is_timescaledb,
3837
- "timescaledb_version": ts_version, # Add this line
3838
  "database": database,
3839
- "server": server_match
 
3840
  })
3841
  finally:
3842
  conn.close()
3843
  except psycopg2.Error as db_err:
3844
- log_message(f"Database connection error during info fetch: {str(db_err)}", "error")
3845
- return JSONResponse(
3846
- content={"success": False, "message": f"Database error: {str(db_err)}"}
3847
- )
3848
  except Exception as e:
3849
  log_message(f"Connection test failed unexpectedly: {str(e)}", "error")
3850
  return JSONResponse(
@@ -3852,6 +4041,7 @@ async def test_connection_endpoint(data: Dict[str, str]):
3852
  content={"success": False, "message": f"An unexpected error occurred: {str(e)}"}
3853
  )
3854
 
 
3855
  @app.post("/database-info")
3856
  async def get_database_info(data: Dict[str, str]):
3857
  """Get additional database information like table count and size"""
@@ -3921,60 +4111,57 @@ async def start_dump(data: Dict[str, Any], background_tasks: BackgroundTasks):
3921
  content={"success": False, "message": "Source connection string is required"}
3922
  )
3923
 
3924
- # Basic validation: Test connection before starting dump
3925
  if not test_connection_logic(source_conn):
3926
- return JSONResponse(
3927
  status_code=400,
3928
  content={"success": False, "message": "Source connection failed. Cannot start dump."}
3929
  )
3930
 
3931
- # Stop any running process first (important!)
3932
  if migration_state["running"]:
3933
  logger.warning("Another process is running. Stopping it before starting dump.")
3934
  stopped = stop_current_process()
3935
  if not stopped:
3936
- logger.error("Failed to stop the existing process. Cannot start dump.")
3937
- return JSONResponse(
3938
- status_code=500, # Internal Server Error might be appropriate
3939
  content={"success": False, "message": "Failed to stop the currently running process."}
3940
  )
3941
- # Add a small delay to allow the process to fully terminate
3942
  time.sleep(0.5)
3943
 
3944
-
3945
- # Create dump file path
3946
  filename = options.get("filename", "timescale_backup").strip()
3947
- # Basic filename sanitization (replace spaces, avoid path traversal)
3948
  filename = filename.replace(" ", "_").replace("..", "").replace("/", "").replace("\\", "")
3949
- if not filename: filename = "timescale_backup" # Fallback if sanitization results in empty name
 
3950
 
3951
  format_flag = options.get("format", "c")
3952
-
3953
- # Determine file extension
3954
  extension = ".dump"
3955
  if format_flag == "p":
3956
  extension = ".sql"
3957
  elif format_flag == "d":
3958
- extension = "" # Directory format has no extension
3959
  elif format_flag == "t":
3960
  extension = ".tar"
3961
 
3962
- # Generate file path carefully
3963
- dumps_dir = Path("dumps").resolve() # Ensure absolute path
3964
  file_path = dumps_dir / f"{filename}{extension}"
3965
-
3966
- # Prevent potential directory traversal if filename somehow still contains harmful chars
3967
  if not str(file_path).startswith(str(dumps_dir)):
3968
- logger.error(f"Invalid filename resulted in path traversal attempt: {filename}")
3969
- return JSONResponse(
3970
  status_code=400,
3971
  content={"success": False, "message": "Invalid filename specified."}
3972
  )
3973
 
3974
- # Reset state before starting background task
3975
  with migration_lock:
3976
- migration_state["id"] = str(uuid.uuid4()) # New ID for new operation
3977
- migration_state["running"] = False # Will be set true by run_dump
 
 
 
 
 
 
 
3978
  migration_state["operation"] = "dump"
3979
  migration_state["start_time"] = None
3980
  migration_state["end_time"] = None
@@ -3984,29 +4171,31 @@ async def start_dump(data: Dict[str, Any], background_tasks: BackgroundTasks):
3984
  migration_state["dump_completed"] = False
3985
  migration_state["restore_completed"] = False
3986
  migration_state["last_activity"] = time.time()
3987
- # Keep logs or clear them? Let's keep them for now.
3988
- # migration_state["log"] = []
3989
  migration_state["process"] = None
3990
- migration_state["progress"] = { # Reset progress
3991
  "current_table": None,
3992
  "tables_completed": 0,
3993
  "total_tables": 0,
3994
  "current_size_mb": 0,
3995
  "growth_rate_mb_per_sec": 0,
3996
  "estimated_time_remaining": None,
3997
- "percent_complete": 0
 
 
 
 
 
3998
  }
3999
 
 
 
4000
 
4001
- # Start dump in background
4002
  background_tasks.add_task(run_dump, source_conn, str(file_path), options)
4003
 
4004
- # Create command preview (with redacted password)
4005
- # Redact password more carefully
4006
  try:
4007
  source_safe_preview = source_conn.replace(source_conn.split('://')[1].split(':')[1].split('@')[0], '***')
4008
  except:
4009
- source_safe_preview = "postgres://user:***@host/db" # Fallback preview
4010
 
4011
  cmd_preview = f'"{source_safe_preview}" -F{format_flag} -v'
4012
  if options.get("compression") and options["compression"] != "default":
@@ -4015,7 +4204,6 @@ async def start_dump(data: Dict[str, Any], background_tasks: BackgroundTasks):
4015
  cmd_preview += f' -n "{options["schema"]}"'
4016
  cmd_preview += f' -f "{os.path.basename(file_path)}"'
4017
 
4018
-
4019
  return JSONResponse(content={
4020
  "success": True,
4021
  "message": "Dump process initiated",
@@ -4029,6 +4217,7 @@ async def start_dump(data: Dict[str, Any], background_tasks: BackgroundTasks):
4029
  content={"success": False, "message": f"An unexpected error occurred: {str(e)}"}
4030
  )
4031
 
 
4032
  @app.post("/start-restore")
4033
  async def start_restore(data: Dict[str, Any], background_tasks: BackgroundTasks):
4034
  """Start a database restore process"""
@@ -4048,75 +4237,84 @@ async def start_restore(data: Dict[str, Any], background_tasks: BackgroundTasks)
4048
  content={"success": False, "message": "Dump file is required"}
4049
  )
4050
 
4051
- # Basic validation: Test connection before starting restore
4052
  if not test_connection_logic(target_conn):
4053
- return JSONResponse(
4054
  status_code=400,
4055
  content={"success": False, "message": "Target connection failed. Cannot start restore."}
4056
  )
4057
 
4058
- # Validate dump file path exists and is within the dumps directory
4059
  dumps_dir = Path("dumps").resolve()
4060
  dump_file_path = Path(dump_file).resolve()
4061
  if not dump_file_path.exists() or not str(dump_file_path).startswith(str(dumps_dir)):
4062
- logger.error(f"Invalid or non-existent dump file specified: {dump_file}")
4063
- return JSONResponse(
4064
  status_code=400,
4065
  content={"success": False, "message": "Invalid or non-existent dump file selected."}
4066
  )
4067
 
4068
- # Stop any running process first (important!)
4069
  if migration_state["running"]:
4070
  logger.warning("Another process is running. Stopping it before starting restore.")
4071
  stopped = stop_current_process()
4072
  if not stopped:
4073
- logger.error("Failed to stop the existing process. Cannot start restore.")
4074
- return JSONResponse(
4075
  status_code=500,
4076
  content={"success": False, "message": "Failed to stop the currently running process."}
4077
  )
4078
- time.sleep(0.5) # Allow time for termination
4079
 
4080
- # Reset state before starting background task
4081
  with migration_lock:
4082
- migration_state["id"] = str(uuid.uuid4()) # New ID for new operation
4083
- migration_state["running"] = False # Will be set true by run_restore
 
 
 
 
 
 
 
4084
  migration_state["operation"] = "restore"
4085
  migration_state["start_time"] = None
4086
  migration_state["end_time"] = None
4087
- migration_state["dump_file"] = None # Not relevant for restore state itself
4088
  migration_state["dump_file_size"] = 0
4089
  migration_state["previous_size"] = 0
4090
  migration_state["dump_completed"] = False
4091
  migration_state["restore_completed"] = False
4092
  migration_state["last_activity"] = time.time()
4093
- # Keep logs
4094
  migration_state["process"] = None
4095
- migration_state["progress"] = { # Reset progress
4096
  "current_table": None,
4097
  "tables_completed": 0,
4098
- "total_tables": 0, # We don't easily know this for restore
4099
  "current_size_mb": 0,
4100
  "growth_rate_mb_per_sec": 0,
4101
  "estimated_time_remaining": None,
4102
- "percent_complete": 0
 
 
 
 
 
4103
  }
4104
 
4105
- # Start restore in background
 
 
4106
  background_tasks.add_task(run_restore, target_conn, str(dump_file_path), options)
4107
 
4108
- # Create command preview (with redacted password)
4109
  try:
4110
  target_safe_preview = target_conn.replace(target_conn.split('://')[1].split(':')[1].split('@')[0], '***')
4111
  except:
4112
- target_safe_preview = "postgres://user:***@host/db"
4113
 
4114
  cmd_preview = f'-d "{target_safe_preview}" -v'
4115
  if options.get("no_owner", True):
4116
  cmd_preview += " --no-owner"
4117
  if options.get("clean", False):
4118
  cmd_preview += " --clean"
4119
- if options.get("single_transaction", True):
 
4120
  cmd_preview += " --single-transaction"
4121
  cmd_preview += f' "{os.path.basename(dump_file)}"'
4122
 
@@ -4132,6 +4330,7 @@ async def start_restore(data: Dict[str, Any], background_tasks: BackgroundTasks)
4132
  content={"success": False, "message": f"An unexpected error occurred: {str(e)}"}
4133
  )
4134
 
 
4135
  @app.post("/stop-process")
4136
  async def stop_process_endpoint():
4137
  """Stop the current database process"""
@@ -4171,18 +4370,33 @@ async def stop_process_endpoint():
4171
 
4172
  @app.get("/status")
4173
  async def get_status():
4174
- """Get the current migration status"""
4175
- # Return a copy to avoid potential modification issues if state grows complex
4176
  with migration_lock:
4177
  state_copy = migration_state.copy()
4178
- # Ensure process object is not sent over JSON
4179
  state_copy["process"] = None
4180
- # Optionally limit log size sent back if it gets large
4181
- # MAX_LOGS_IN_STATUS = 100
4182
- # if len(state_copy["log"]) > MAX_LOGS_IN_STATUS:
4183
- # state_copy["log"] = state_copy["log"][-MAX_LOGS_IN_STATUS:]
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
4184
  return state_copy
4185
 
 
4186
  @app.post("/clear-logs")
4187
  async def clear_logs():
4188
  """Clear all logs"""
 
4
  from fastapi.staticfiles import StaticFiles
5
  from fastapi.middleware.cors import CORSMiddleware
6
  import os
7
+ import re
8
  import subprocess
9
  import threading
10
  import time
 
67
  "last_activity": time.time(),
68
  "log": [],
69
  "process": None,
70
+ "chunk_info_internal": None, # NEW: internal TSDB chunk details (not returned to client)
71
  "progress": {
72
  "current_table": None,
73
  "tables_completed": 0,
 
75
  "current_size_mb": 0,
76
  "growth_rate_mb_per_sec": 0,
77
  "estimated_time_remaining": None,
78
+ "percent_complete": 0,
79
+ "total_expected_bytes": 0, # NEW: expected total bytes from chunk map
80
+ "bytes_completed": 0, # NEW: bytes completed (chunk-based)
81
+ "chunks_completed": 0, # NEW: chunks completed (count)
82
+ "chunks_total": 0, # NEW: total chunks (from chunk map)
83
+ "counted_chunk_names": [] # NEW: to prevent double counting (internal)
84
  }
85
  }
86
 
87
+
88
  # Lock for updating global state
89
  migration_lock = threading.Lock()
90
 
 
123
  except Exception:
124
  return 0
125
 
126
+ def build_timescaledb_chunk_info(conn):
127
+ """Build a comprehensive map of all TimescaleDB chunks and their compressed counterparts."""
128
+ info = {
129
+ "generated_at": datetime.datetime.utcnow().isoformat() + 'Z',
130
+ "summary": {
131
+ "chunks_count": 0,
132
+ "hypertables_count": 0,
133
+ "total_effective_size_bytes": 0
134
+ },
135
+ "name_to_entry": {}, # map of schema.table -> entry
136
+ "entries": [] # list of chunk relationship entries
137
+ }
138
+ try:
139
+ with conn.cursor() as cur:
140
+ cur.execute("""
141
+ SELECT
142
+ ht.id AS hypertable_id,
143
+ ht.schema_name || '.' || ht.table_name AS hypertable,
144
+ orig.schema_name || '.' || orig.table_name AS original_chunk,
145
+ CASE WHEN comp.id IS NOT NULL THEN comp.schema_name || '.' || comp.table_name END AS compressed_chunk,
146
+ pg_total_relation_size(orig.schema_name || '.' || orig.table_name) AS original_size_bytes,
147
+ CASE WHEN comp.id IS NOT NULL THEN pg_total_relation_size(comp.schema_name || '.' || comp.table_name) END AS compressed_size_bytes,
148
+ (orig.compressed_chunk_id IS NOT NULL) AS is_compressed,
149
+ (ht.table_name LIKE '_materialized_hypertable_%') AS is_cagg
150
+ FROM _timescaledb_catalog.chunk AS orig
151
+ JOIN _timescaledb_catalog.hypertable AS ht ON ht.id = orig.hypertable_id
152
+ LEFT JOIN _timescaledb_catalog.chunk AS comp ON orig.compressed_chunk_id = comp.id
153
+ """)
154
+ rows = cur.fetchall()
155
+ hyps = set()
156
+ total_effective = 0
157
+ for (
158
+ hypertable_id, hypertable, original_chunk, compressed_chunk,
159
+ original_size, compressed_size, is_compressed, is_cagg
160
+ ) in rows:
161
+ effective_size = compressed_size if compressed_size is not None else original_size
162
+ entry = {
163
+ "hypertable_id": hypertable_id,
164
+ "hypertable": hypertable,
165
+ "original_chunk": original_chunk,
166
+ "compressed_chunk": compressed_chunk,
167
+ "original_size_bytes": int(original_size) if original_size is not None else 0,
168
+ "compressed_size_bytes": int(compressed_size) if compressed_size is not None else None,
169
+ "effective_size_bytes": int(effective_size) if effective_size is not None else 0,
170
+ "compression_status": "COMPRESSED" if is_compressed else "UNCOMPRESSED",
171
+ "is_cagg": bool(is_cagg)
172
+ }
173
+ info["entries"].append(entry)
174
+ hyps.add(hypertable)
175
+ total_effective += entry["effective_size_bytes"]
176
+ # Map both names to same entry for fast lookup
177
+ info["name_to_entry"][original_chunk] = entry
178
+ if compressed_chunk:
179
+ info["name_to_entry"][compressed_chunk] = entry
180
+
181
+ info["summary"]["chunks_count"] = len(info["entries"])
182
+ info["summary"]["hypertables_count"] = len(hyps)
183
+ info["summary"]["total_effective_size_bytes"] = total_effective
184
+ except Exception as e:
185
+ logger.warning(f"Could not build TimescaleDB chunk map: {e}")
186
+ return info
187
+
188
+
189
  def monitor_dump_size():
190
  """Monitor the dump file size and update state"""
191
  while migration_state["running"] and migration_state["operation"] == "dump":
 
211
  time.sleep(1) # Update every second
212
 
213
  def run_dump(source_conn: str, file_path: str, options: dict):
214
+ """Run pg_dump in a background thread with chunk-based progress tracking."""
215
  try:
 
216
  if os.path.exists(file_path):
217
  os.remove(file_path)
218
 
 
219
  env = os.environ.copy()
220
+ format_flag = "-F" + options.get("format", "c")
 
 
221
  cmd = ["pg_dump", source_conn, format_flag, "-v", "-f", file_path]
222
 
 
223
  if options.get("schema"):
224
  cmd.extend(["-n", options["schema"]])
225
 
 
226
  if options.get("compression") and options["compression"] != "default":
227
  cmd.extend(["-Z", options["compression"]])
228
 
229
  log_message(f"Starting database dump to {file_path}", "info", " ".join(cmd))
230
 
 
231
  monitor_thread = threading.Thread(target=monitor_dump_size, daemon=True)
232
  monitor_thread.start()
233
 
 
234
  with migration_lock:
235
  migration_state["start_time"] = time.time()
236
  migration_state["running"] = True
237
  migration_state["operation"] = "dump"
238
  migration_state["dump_file"] = file_path
239
  migration_state["dump_completed"] = False
240
+ migration_state["previous_size"] = 0
241
 
242
+ preexec_fn_to_use = os.setsid if hasattr(os, 'setsid') else None
 
 
 
 
 
243
 
244
  process = subprocess.Popen(
245
  cmd,
 
247
  stderr=subprocess.PIPE,
248
  env=env,
249
  text=True,
250
+ bufsize=1,
251
  universal_newlines=True,
252
+ preexec_fn=preexec_fn_to_use
253
  )
254
 
255
  with migration_lock:
256
  migration_state["process"] = process
257
 
 
258
  if process.stderr:
259
  for line in iter(process.stderr.readline, ''):
260
  line = line.strip()
261
  if not line:
262
  continue
 
263
  log_message(line, "info")
264
+
265
  if "dumping contents of table" in line:
266
+ qualified = None
267
  try:
268
+ quoted = re.findall(r'"([^"]+)"', line)
269
+ if len(quoted) >= 2:
270
+ qualified = f"{quoted[-2]}.{quoted[-1]}"
271
+ else:
272
+ m = re.search(r'dumping contents of table\s+([^\s]+)', line, re.IGNORECASE)
273
+ if m:
274
+ qualified = m.group(1).strip().strip('"')
275
+ except Exception as parse_err:
276
+ logger.warning(f"Could not parse table name from dump line: {line} ({parse_err})")
277
+
278
+ if qualified:
279
  with migration_lock:
280
+ migration_state["progress"]["current_table"] = qualified
281
+ # Chunk-based progress
282
+ chunk_info = migration_state.get("chunk_info_internal")
283
+ if chunk_info:
284
+ entry = chunk_info["name_to_entry"].get(qualified)
285
+ if entry:
286
+ counted = set(migration_state["progress"].get("counted_chunk_names", []))
287
+ names_to_mark = {entry["original_chunk"]}
288
+ if entry["compressed_chunk"]:
289
+ names_to_mark.add(entry["compressed_chunk"])
290
+ # only count once
291
+ if not (counted & names_to_mark):
292
+ migration_state["progress"]["bytes_completed"] += entry["effective_size_bytes"]
293
+ migration_state["progress"]["chunks_completed"] += 1
294
+ counted |= names_to_mark
295
+ migration_state["progress"]["counted_chunk_names"] = list(counted)
296
+ total = migration_state["progress"].get("total_expected_bytes", 0) or 0
297
+ if total > 0:
298
+ migration_state["progress"]["percent_complete"] = min(
299
+ 99,
300
+ int((migration_state["progress"]["bytes_completed"] / total) * 100)
301
+ )
302
+ elif migration_state["progress"]["chunks_total"]:
303
+ migration_state["progress"]["percent_complete"] = min(
304
+ 99,
305
+ int(migration_state["progress"]["chunks_completed"] * 100 / migration_state["progress"]["chunks_total"])
306
+ )
307
+ elapsed = time.time() - (migration_state.get("start_time") or time.time())
308
+ if elapsed > 5 and migration_state["progress"]["bytes_completed"] > 0:
309
+ rate = migration_state["progress"]["bytes_completed"] / elapsed
310
+ remaining = total - migration_state["progress"]["bytes_completed"]
311
+ eta = int(remaining / rate) if rate > 0 and total > 0 else None
312
+ migration_state["progress"]["estimated_time_remaining"] = eta
313
+ # Maintain table count as well (legacy)
314
  migration_state["progress"]["tables_completed"] += 1
315
+
 
 
316
  with migration_lock:
317
  if not migration_state["running"]:
318
+ break
319
 
 
320
  stdout, stderr = process.communicate()
321
  exit_code = process.returncode
322
 
323
  with migration_lock:
324
+ if migration_state["running"]:
 
325
  if exit_code == 0:
 
326
  final_size = get_file_size_mb(file_path)
327
  migration_state["dump_file_size"] = final_size
328
  migration_state["progress"]["current_size_mb"] = round(final_size, 2)
329
  migration_state["dump_completed"] = True
330
  migration_state["end_time"] = time.time()
331
+ migration_state["progress"]["percent_complete"] = 100
332
+ migration_state["progress"]["estimated_time_remaining"] = 0
333
  total_time = migration_state["end_time"] - migration_state["start_time"]
334
  log_message(
335
  f"Database dump completed successfully. Size: {round(final_size, 2)} MB. Time: {round(total_time, 2)} seconds",
 
351
  migration_state["process"] = None
352
  return False
353
 
354
+
355
  def run_restore(target_conn: str, file_path: str, options: dict):
356
+ """Run pg_restore in a background thread with chunk-based progress tracking."""
357
  try:
358
  if not os.path.exists(file_path):
359
  log_message(f"Dump file not found: {file_path}", "error")
360
  with migration_lock:
361
+ migration_state["running"] = False
362
  return False
363
 
 
364
  env = os.environ.copy()
365
 
 
366
  if options.get("timescaledb_pre_restore", True):
367
  pre_restore_cmd = ["psql", target_conn, "-c", "SELECT timescaledb_pre_restore();"]
368
  log_message("Running timescaledb_pre_restore()", "info", " ".join(pre_restore_cmd))
 
377
  if pre_restore_process.returncode != 0:
378
  log_message(f"Pre-restore failed: {pre_restore_stderr or pre_restore_stdout}", "error")
379
  with migration_lock:
380
+ migration_state["running"] = False
381
  return False
382
 
 
383
  cmd = ["pg_restore", "-d", target_conn, "-v"]
 
 
384
  if options.get("no_owner", True):
385
  cmd.append("--no-owner")
 
386
  if options.get("clean", False):
387
  cmd.append("--clean")
388
+ # Default OFF now:
389
+ if options.get("single_transaction", False):
390
  cmd.append("--single-transaction")
 
391
  cmd.append(file_path)
392
 
393
  log_message(f"Starting database restore from {file_path}", "info", " ".join(cmd))
 
397
  migration_state["running"] = True
398
  migration_state["operation"] = "restore"
399
  migration_state["restore_completed"] = False
400
+ migration_state["progress"]["tables_completed"] = 0
401
 
402
+ preexec_fn_to_use = os.setsid if hasattr(os, 'setsid') else None
 
 
 
403
 
404
  process = subprocess.Popen(
405
  cmd,
 
407
  stderr=subprocess.PIPE,
408
  env=env,
409
  text=True,
410
+ bufsize=1,
411
  universal_newlines=True,
412
+ preexec_fn=preexec_fn_to_use
413
  )
414
 
415
  with migration_lock:
416
  migration_state["process"] = process
417
 
 
418
  if process.stderr:
419
  for line in iter(process.stderr.readline, ''):
420
  line = line.strip()
421
  if not line:
422
  continue
 
423
  log_message(line, "info")
424
+
425
+ if re.search(r'(processing|restoring)\s+data\s+for\s+table', line, re.IGNORECASE):
426
+ qualified = None
427
  try:
428
+ quoted = re.findall(r'"([^"]+)"', line)
429
+ if len(quoted) >= 2:
430
+ qualified = f"{quoted[-2]}.{quoted[-1]}"
 
 
 
 
 
 
 
 
431
  else:
432
+ m = re.search(r'(processing|restoring)\s+data\s+for\s+table\s+([^\s]+)', line, re.IGNORECASE)
433
+ if m:
434
+ qualified = m.group(2).strip().strip('"')
435
  except Exception as parse_err:
436
  logger.warning(f"Error parsing restore line '{line}': {parse_err}")
437
+
438
+ if qualified:
439
+ with migration_lock:
440
+ migration_state["progress"]["current_table"] = qualified
441
+ chunk_info = migration_state.get("chunk_info_internal")
442
+ if chunk_info:
443
+ entry = chunk_info["name_to_entry"].get(qualified)
444
+ if entry:
445
+ counted = set(migration_state["progress"].get("counted_chunk_names", []))
446
+ names_to_mark = {entry["original_chunk"]}
447
+ if entry["compressed_chunk"]:
448
+ names_to_mark.add(entry["compressed_chunk"])
449
+ if not (counted & names_to_mark):
450
+ migration_state["progress"]["bytes_completed"] += entry["effective_size_bytes"]
451
+ migration_state["progress"]["chunks_completed"] += 1
452
+ counted |= names_to_mark
453
+ migration_state["progress"]["counted_chunk_names"] = list(counted)
454
+ total = migration_state["progress"].get("total_expected_bytes", 0) or 0
455
+ if total > 0:
456
+ migration_state["progress"]["percent_complete"] = min(
457
+ 99,
458
+ int((migration_state["progress"]["bytes_completed"] / total) * 100)
459
+ )
460
+ elif migration_state["progress"]["chunks_total"]:
461
+ migration_state["progress"]["percent_complete"] = min(
462
+ 99,
463
+ int(migration_state["progress"]["chunks_completed"] * 100 / migration_state["progress"]["chunks_total"])
464
+ )
465
+ elapsed = time.time() - (migration_state.get("start_time") or time.time())
466
+ if elapsed > 5 and migration_state["progress"]["bytes_completed"] > 0:
467
+ rate = migration_state["progress"]["bytes_completed"] / elapsed
468
+ remaining = total - migration_state["progress"]["bytes_completed"]
469
+ eta = int(remaining / rate) if rate > 0 and total > 0 else None
470
+ migration_state["progress"]["estimated_time_remaining"] = eta
471
+ # Maintain legacy counter
472
+ migration_state["progress"]["tables_completed"] += 1
473
+
474
  with migration_lock:
475
  if not migration_state["running"]:
476
+ break
477
 
 
478
  stdout, stderr = process.communicate()
479
  exit_code = process.returncode
480
 
481
  post_restore_success = True
 
482
  if exit_code == 0 and options.get("timescaledb_post_restore", True):
483
  post_restore_cmd = ["psql", target_conn, "-c", "SELECT timescaledb_post_restore(); ANALYZE;"]
484
  log_message("Running timescaledb_post_restore() and ANALYZE", "info", " ".join(post_restore_cmd))
 
492
  post_restore_stdout, post_restore_stderr = post_restore_process.communicate()
493
  if post_restore_process.returncode != 0:
494
  log_message(f"Post-restore failed: {post_restore_stderr or post_restore_stdout}", "error")
495
+ post_restore_success = False
496
 
497
  with migration_lock:
498
+ if migration_state["running"]:
 
499
  if exit_code == 0 and post_restore_success:
500
  migration_state["restore_completed"] = True
501
  migration_state["end_time"] = time.time()
502
+ migration_state["progress"]["percent_complete"] = 100
503
+ migration_state["progress"]["estimated_time_remaining"] = 0
504
  total_time = migration_state["end_time"] - migration_state["start_time"]
505
  log_message(
506
  f"Database restore completed successfully. Time: {round(total_time, 2)} seconds",
 
509
  elif exit_code != 0:
510
  error_message = stderr or stdout or "Unknown error during restore"
511
  log_message(f"Database restore failed: {error_message}", "error")
 
512
 
513
  migration_state["running"] = False
514
  migration_state["process"] = None
 
522
  migration_state["process"] = None
523
  return False
524
 
525
+
526
  # Replace the old stop_current_process with the new one
527
  def stop_current_process():
528
  """Stop the current process with improved forceful termination"""
 
2422
  <label class="form-label" for="clean">Clean (drop) database objects before recreating (--clean)</label>
2423
  </div>
2424
  <div class="form-check mb-2">
2425
+ <input class="form-check-input" type="checkbox" id="single-transaction">
2426
  <label class="form-label" for="single-transaction">Restore as a single transaction (--single-transaction)</label>
2427
  </div>
2428
  </div>
 
2513
  <div class="logs-container mt-4">
2514
  <div class="logs-header">
2515
  <div class="logs-title">Activity Log</div>
2516
+ <div class="d-flex align-center gap-3">
2517
+ <div class="logs-filters">
2518
+ <button class="log-filter active" data-level="all">All</button>
2519
+ <button class="log-filter" data-level="info">Info</button>
2520
+ <button class="log-filter" data-level="success">Success</button>
2521
+ <button class="log-filter" data-level="warning">Warning</button>
2522
+ <button class="log-filter" data-level="error">Error</button>
2523
+ </div>
2524
+ <div class="form-check" title="When enabled, the log viewer will not auto-scroll to new entries">
2525
+ <input class="form-check-input" type="checkbox" id="scroll-lock">
2526
+ <label class="form-label" for="scroll-lock">Scroll Lock</label>
2527
+ </div>
2528
  </div>
2529
  </div>
2530
  <div class="logs-body" id="logs-output">
 
2670
  const cancelConfirmBtn = document.getElementById('cancel-confirm-btn');
2671
  const confirmActionBtn = document.getElementById('confirm-action-btn');
2672
  const confirmModalBody = document.getElementById('confirm-modal-body');
2673
+
2674
+ const scrollLockCheckbox = document.getElementById('scroll-lock');
2675
+ let scrollLock = JSON.parse(localStorage.getItem('scroll_lock') || 'false');
2676
+ if (scrollLockCheckbox) {
2677
+ scrollLockCheckbox.checked = scrollLock;
2678
+ }
2679
+ function maybeAutoScroll(el) {
2680
+ if (!scrollLock && el) {
2681
+ el.scrollTop = el.scrollHeight;
2682
+ }
2683
+ }
2684
+ if (scrollLockCheckbox) {
2685
+ scrollLockCheckbox.addEventListener('change', () => {
2686
+ scrollLock = scrollLockCheckbox.checked;
2687
+ localStorage.setItem('scroll_lock', JSON.stringify(scrollLock));
2688
+ });
2689
+ }
2690
+
2691
  // State variables
2692
  let sizeChart = null;
2693
  let updateInterval = null;
 
2783
  tab.classList.add('active');
2784
  const tabId = tab.getAttribute('data-tab');
2785
  document.getElementById(`${tabId}-tab`).classList.add('active');
2786
+ // Persist active tab
2787
+ localStorage.setItem('active_tab', tabId);
2788
  });
2789
  });
2790
+
2791
  // Log filter functionality
2792
  logFilters.forEach(filter => {
2793
  filter.addEventListener('click', () => {
 
3534
 
3535
  terminalOutput.appendChild(line);
3536
  // Auto-scroll
3537
+ maybeAutoScroll(terminalOutput);
3538
  }
3539
  // Add log entry
3540
  function addLogEntry(log) {
 
3560
  logEntry.style.display = 'none';
3561
  }
3562
  // Scroll to bottom
3563
+ maybeAutoScroll(logsOutput);
3564
 
3565
  // Also add to terminal, map log level to terminal type
3566
  let terminalType = 'output';
 
3839
  await updateStatus(); // This also updates migration_state
3840
  // Add initial log check
3841
  await checkForNewLogs();
3842
+ const savedTab = localStorage.getItem('active_tab');
3843
+ if (savedTab && document.querySelector(`.tab[data-tab="${savedTab}"]`)) {
3844
+ document.querySelector(`.tab[data-tab="${savedTab}"]`).click();
3845
+ }
3846
+ // Reflect completed dump state on reload
3847
+ if (migration_state && migration_state.dump_file) {
3848
+ dumpProgressSection.classList.remove('hidden');
3849
+ dumpFilePathElement.textContent = migration_state.dump_file;
3850
+ currentSizeElement.textContent = `${(migration_state.dump_file_size || 0).toFixed(2)} MB`;
3851
+ if (migration_state.dump_completed) {
3852
+ dumpStatusElement.textContent = 'Completed';
3853
+ dumpStatusElement.className = 'badge success';
3854
+ const downloadPath = `/downloads/${migration_state.dump_file.split(/[\\/]/).pop()}`;
3855
+ downloadDumpBtn.href = downloadPath;
3856
+ downloadDumpBtn.disabled = false;
3857
+ gotoRestoreFromDumpBtn.disabled = false;
3858
+ statusBadge.className = 'status-badge success';
3859
+ statusBadge.textContent = 'Dump Complete';
3860
+ } else if (migration_state.running && migration_state.operation === 'dump') {
3861
+ dumpStatusElement.textContent = 'In Progress';
3862
+ dumpStatusElement.className = 'badge info';
3863
+ }
3864
+ }
3865
+
3866
+ // Reflect completed restore state on reload
3867
+ if (migration_state && migration_state.restore_completed) {
3868
+ restoreProgressSection.classList.remove('hidden');
3869
+ restoreStatusElement.textContent = 'Completed';
3870
+ restoreSubstatusElement.textContent = 'Restore completed successfully';
3871
+ restoreProgressBar.style.width = '100%';
3872
+ restoreProgressBar.classList.remove('animated');
3873
+ restoreProgressValue.textContent = '100%';
3874
+ statusBadge.className = 'status-badge success';
3875
+ statusBadge.textContent = 'Restore Complete';
3876
+ }
3877
 
3878
  // If a process was running when the page loaded, sync UI and start updates
3879
  if (migration_state && migration_state.running) {
 
3894
  startDumpBtn.disabled = true;
3895
  }
3896
  }
3897
+
3898
  }
3899
  // Initialize app
3900
  initialize();
 
3949
 
3950
  @app.post("/test-connection")
3951
  async def test_connection_endpoint(data: Dict[str, str]):
3952
+ """Test a database connection and get basic info; if source, pre-load TimescaleDB chunk map."""
3953
  try:
3954
  connection_string = data.get("connection_string")
3955
+ connection_type = data.get("connection_type", "source")
3956
  if not connection_string:
3957
  return JSONResponse(
3958
  status_code=400,
3959
  content={"success": False, "message": "Connection string is required"}
3960
  )
3961
 
 
3962
  if not test_connection_logic(connection_string):
 
3963
  return JSONResponse(
3964
  content={"success": False, "message": "Failed to connect to database"}
3965
  )
3966
 
 
3967
  conn = psycopg2.connect(connection_string)
3968
  try:
3969
+ chunk_summary_resp = None
3970
  with conn.cursor() as cur:
 
3971
  cur.execute("SELECT version()")
3972
  version_result = cur.fetchone()
3973
  version = version_result[0] if version_result else "Unknown"
3974
 
3975
+ # Check TimescaleDB
3976
  is_timescaledb = False
3977
  ts_version = None
3978
  try:
 
3979
  cur.execute("SELECT EXISTS (SELECT 1 FROM pg_extension WHERE extname = 'timescaledb');")
3980
  if cur.fetchone()[0]:
3981
+ cur.execute("SELECT extversion FROM pg_extension WHERE extname = 'timescaledb'")
3982
+ ts_version_result = cur.fetchone()
3983
+ is_timescaledb = ts_version_result is not None
3984
+ ts_version = ts_version_result[0] if ts_version_result else None
3985
  except psycopg2.Error as ts_err:
3986
+ logger.warning(f"Could not check TimescaleDB extension: {ts_err}")
 
3987
 
 
3988
  cur.execute("SELECT current_database()")
3989
  db_result = cur.fetchone()
3990
  database = db_result[0] if db_result else "Unknown"
3991
 
 
3992
  server_match = "Unknown"
3993
  try:
 
3994
  host_part = connection_string.split('@')[-1].split('/')[0].split(':')[0]
3995
  if host_part:
3996
  server_match = host_part
 
3997
  elif " on " in version:
3998
+ server_match = version.split(" on ")[-1].split(",")[0]
3999
  except Exception:
4000
  logger.warning("Could not parse server host from connection string or version.")
4001
 
4002
+ # If it's the source and TimescaleDB is present, build and cache chunk info
4003
+ if connection_type == "source" and is_timescaledb:
4004
+ chunk_info = build_timescaledb_chunk_info(conn)
4005
+ with migration_lock:
4006
+ migration_state["chunk_info_internal"] = chunk_info
4007
+ if chunk_info and chunk_info.get("summary"):
4008
+ total_mb = round(chunk_info["summary"]["total_effective_size_bytes"] / (1024 * 1024), 2)
4009
+ chunk_summary_resp = {
4010
+ "chunks_count": chunk_info["summary"]["chunks_count"],
4011
+ "hypertables_count": chunk_info["summary"]["hypertables_count"],
4012
+ "total_effective_size_mb": total_mb
4013
+ }
4014
+ log_message(
4015
+ f"Loaded TimescaleDB chunk map: {chunk_summary_resp['chunks_count']} chunks across "
4016
+ f"{chunk_summary_resp['hypertables_count']} hypertables, ~{total_mb} MB effective data.",
4017
+ "info"
4018
+ )
4019
 
4020
  log_message(f"Successful connection test to {connection_type} database: {database} on {server_match}", "success")
4021
  return JSONResponse(content={
4022
  "success": True,
4023
  "version": version,
4024
  "is_timescaledb": is_timescaledb,
4025
+ "timescaledb_version": ts_version,
4026
  "database": database,
4027
+ "server": server_match,
4028
+ "chunk_summary": chunk_summary_resp # May be None if not source or no TSDB
4029
  })
4030
  finally:
4031
  conn.close()
4032
  except psycopg2.Error as db_err:
4033
+ log_message(f"Database connection error during info fetch: {str(db_err)}", "error")
4034
+ return JSONResponse(
4035
+ content={"success": False, "message": f"Database error: {str(db_err)}"}
4036
+ )
4037
  except Exception as e:
4038
  log_message(f"Connection test failed unexpectedly: {str(e)}", "error")
4039
  return JSONResponse(
 
4041
  content={"success": False, "message": f"An unexpected error occurred: {str(e)}"}
4042
  )
4043
 
4044
+
4045
  @app.post("/database-info")
4046
  async def get_database_info(data: Dict[str, str]):
4047
  """Get additional database information like table count and size"""
 
4111
  content={"success": False, "message": "Source connection string is required"}
4112
  )
4113
 
 
4114
  if not test_connection_logic(source_conn):
4115
+ return JSONResponse(
4116
  status_code=400,
4117
  content={"success": False, "message": "Source connection failed. Cannot start dump."}
4118
  )
4119
 
 
4120
  if migration_state["running"]:
4121
  logger.warning("Another process is running. Stopping it before starting dump.")
4122
  stopped = stop_current_process()
4123
  if not stopped:
4124
+ logger.error("Failed to stop the existing process. Cannot start dump.")
4125
+ return JSONResponse(
4126
+ status_code=500,
4127
  content={"success": False, "message": "Failed to stop the currently running process."}
4128
  )
 
4129
  time.sleep(0.5)
4130
 
 
 
4131
  filename = options.get("filename", "timescale_backup").strip()
 
4132
  filename = filename.replace(" ", "_").replace("..", "").replace("/", "").replace("\\", "")
4133
+ if not filename:
4134
+ filename = "timescale_backup"
4135
 
4136
  format_flag = options.get("format", "c")
 
 
4137
  extension = ".dump"
4138
  if format_flag == "p":
4139
  extension = ".sql"
4140
  elif format_flag == "d":
4141
+ extension = ""
4142
  elif format_flag == "t":
4143
  extension = ".tar"
4144
 
4145
+ dumps_dir = Path("dumps").resolve()
 
4146
  file_path = dumps_dir / f"{filename}{extension}"
 
 
4147
  if not str(file_path).startswith(str(dumps_dir)):
4148
+ logger.error(f"Invalid filename resulted in path traversal attempt: {filename}")
4149
+ return JSONResponse(
4150
  status_code=400,
4151
  content={"success": False, "message": "Invalid filename specified."}
4152
  )
4153
 
4154
+ # Read expected totals from chunk info (if any loaded from source test)
4155
  with migration_lock:
4156
+ chunk_info = migration_state.get("chunk_info_internal")
4157
+ expected_bytes = 0
4158
+ chunks_total = 0
4159
+ if chunk_info and "summary" in chunk_info:
4160
+ expected_bytes = int(chunk_info["summary"].get("total_effective_size_bytes", 0) or 0)
4161
+ chunks_total = int(chunk_info["summary"].get("chunks_count", 0) or 0)
4162
+
4163
+ migration_state["id"] = str(uuid.uuid4())
4164
+ migration_state["running"] = False
4165
  migration_state["operation"] = "dump"
4166
  migration_state["start_time"] = None
4167
  migration_state["end_time"] = None
 
4171
  migration_state["dump_completed"] = False
4172
  migration_state["restore_completed"] = False
4173
  migration_state["last_activity"] = time.time()
 
 
4174
  migration_state["process"] = None
4175
+ migration_state["progress"] = {
4176
  "current_table": None,
4177
  "tables_completed": 0,
4178
  "total_tables": 0,
4179
  "current_size_mb": 0,
4180
  "growth_rate_mb_per_sec": 0,
4181
  "estimated_time_remaining": None,
4182
+ "percent_complete": 0,
4183
+ "total_expected_bytes": expected_bytes,
4184
+ "bytes_completed": 0,
4185
+ "chunks_completed": 0,
4186
+ "chunks_total": chunks_total,
4187
+ "counted_chunk_names": []
4188
  }
4189
 
4190
+ if expected_bytes > 0:
4191
+ log_message(f"Using chunk map for dump progress: {round(expected_bytes / (1024*1024), 2)} MB across {chunks_total} chunks.", "info")
4192
 
 
4193
  background_tasks.add_task(run_dump, source_conn, str(file_path), options)
4194
 
 
 
4195
  try:
4196
  source_safe_preview = source_conn.replace(source_conn.split('://')[1].split(':')[1].split('@')[0], '***')
4197
  except:
4198
+ source_safe_preview = "postgres://user:***@host/db"
4199
 
4200
  cmd_preview = f'"{source_safe_preview}" -F{format_flag} -v'
4201
  if options.get("compression") and options["compression"] != "default":
 
4204
  cmd_preview += f' -n "{options["schema"]}"'
4205
  cmd_preview += f' -f "{os.path.basename(file_path)}"'
4206
 
 
4207
  return JSONResponse(content={
4208
  "success": True,
4209
  "message": "Dump process initiated",
 
4217
  content={"success": False, "message": f"An unexpected error occurred: {str(e)}"}
4218
  )
4219
 
4220
+
4221
  @app.post("/start-restore")
4222
  async def start_restore(data: Dict[str, Any], background_tasks: BackgroundTasks):
4223
  """Start a database restore process"""
 
4237
  content={"success": False, "message": "Dump file is required"}
4238
  )
4239
 
 
4240
  if not test_connection_logic(target_conn):
4241
+ return JSONResponse(
4242
  status_code=400,
4243
  content={"success": False, "message": "Target connection failed. Cannot start restore."}
4244
  )
4245
 
 
4246
  dumps_dir = Path("dumps").resolve()
4247
  dump_file_path = Path(dump_file).resolve()
4248
  if not dump_file_path.exists() or not str(dump_file_path).startswith(str(dumps_dir)):
4249
+ logger.error(f"Invalid or non-existent dump file specified: {dump_file}")
4250
+ return JSONResponse(
4251
  status_code=400,
4252
  content={"success": False, "message": "Invalid or non-existent dump file selected."}
4253
  )
4254
 
 
4255
  if migration_state["running"]:
4256
  logger.warning("Another process is running. Stopping it before starting restore.")
4257
  stopped = stop_current_process()
4258
  if not stopped:
4259
+ logger.error("Failed to stop the existing process. Cannot start restore.")
4260
+ return JSONResponse(
4261
  status_code=500,
4262
  content={"success": False, "message": "Failed to stop the currently running process."}
4263
  )
4264
+ time.sleep(0.5)
4265
 
 
4266
  with migration_lock:
4267
+ chunk_info = migration_state.get("chunk_info_internal")
4268
+ expected_bytes = 0
4269
+ chunks_total = 0
4270
+ if chunk_info and "summary" in chunk_info:
4271
+ expected_bytes = int(chunk_info["summary"].get("total_effective_size_bytes", 0) or 0)
4272
+ chunks_total = int(chunk_info["summary"].get("chunks_count", 0) or 0)
4273
+
4274
+ migration_state["id"] = str(uuid.uuid4())
4275
+ migration_state["running"] = False
4276
  migration_state["operation"] = "restore"
4277
  migration_state["start_time"] = None
4278
  migration_state["end_time"] = None
4279
+ migration_state["dump_file"] = None
4280
  migration_state["dump_file_size"] = 0
4281
  migration_state["previous_size"] = 0
4282
  migration_state["dump_completed"] = False
4283
  migration_state["restore_completed"] = False
4284
  migration_state["last_activity"] = time.time()
 
4285
  migration_state["process"] = None
4286
+ migration_state["progress"] = {
4287
  "current_table": None,
4288
  "tables_completed": 0,
4289
+ "total_tables": 0,
4290
  "current_size_mb": 0,
4291
  "growth_rate_mb_per_sec": 0,
4292
  "estimated_time_remaining": None,
4293
+ "percent_complete": 0,
4294
+ "total_expected_bytes": expected_bytes,
4295
+ "bytes_completed": 0,
4296
+ "chunks_completed": 0,
4297
+ "chunks_total": chunks_total,
4298
+ "counted_chunk_names": []
4299
  }
4300
 
4301
+ if expected_bytes > 0:
4302
+ log_message(f"Using chunk map for restore progress: {round(expected_bytes / (1024*1024), 2)} MB across {chunks_total} chunks.", "info")
4303
+
4304
  background_tasks.add_task(run_restore, target_conn, str(dump_file_path), options)
4305
 
 
4306
  try:
4307
  target_safe_preview = target_conn.replace(target_conn.split('://')[1].split(':')[1].split('@')[0], '***')
4308
  except:
4309
+ target_safe_preview = "postgres://user:***@host/db"
4310
 
4311
  cmd_preview = f'-d "{target_safe_preview}" -v'
4312
  if options.get("no_owner", True):
4313
  cmd_preview += " --no-owner"
4314
  if options.get("clean", False):
4315
  cmd_preview += " --clean"
4316
+ # Default OFF now for single-transaction
4317
+ if options.get("single_transaction", False):
4318
  cmd_preview += " --single-transaction"
4319
  cmd_preview += f' "{os.path.basename(dump_file)}"'
4320
 
 
4330
  content={"success": False, "message": f"An unexpected error occurred: {str(e)}"}
4331
  )
4332
 
4333
+
4334
  @app.post("/stop-process")
4335
  async def stop_process_endpoint():
4336
  """Stop the current database process"""
 
4370
 
4371
  @app.get("/status")
4372
  async def get_status():
4373
+ """Get the current migration status (without heavy internal structures)."""
 
4374
  with migration_lock:
4375
  state_copy = migration_state.copy()
 
4376
  state_copy["process"] = None
4377
+
4378
+ # Remove heavy internal chunk details; expose a small summary instead
4379
+ if state_copy.get("chunk_info_internal"):
4380
+ chunk_info = state_copy["chunk_info_internal"]
4381
+ summary = chunk_info.get("summary", {}) if chunk_info else {}
4382
+ if "total_effective_size_bytes" in summary:
4383
+ total_mb = round(summary["total_effective_size_bytes"] / (1024 * 1024), 2)
4384
+ else:
4385
+ total_mb = None
4386
+ state_copy["chunk_summary"] = {
4387
+ "chunks_count": summary.get("chunks_count"),
4388
+ "hypertables_count": summary.get("hypertables_count"),
4389
+ "total_effective_size_mb": total_mb
4390
+ }
4391
+ del state_copy["chunk_info_internal"]
4392
+
4393
+ # Trim internal counted names (can be large)
4394
+ if "progress" in state_copy and "counted_chunk_names" in state_copy["progress"]:
4395
+ state_copy["progress"]["counted_chunk_names"] = []
4396
+
4397
  return state_copy
4398
 
4399
+
4400
  @app.post("/clear-logs")
4401
  async def clear_logs():
4402
  """Clear all logs"""