arcticaurora commited on
Commit
6639025
·
verified ·
1 Parent(s): f4c0131

Update app.py

Browse files
Files changed (1) hide show
  1. app.py +234 -411
app.py CHANGED
@@ -12,12 +12,12 @@ import time
12
  import json
13
  import datetime
14
  import uuid
15
- import shutil
16
- from typing import Dict, Any, Optional, List
17
  from pathlib import Path
18
  import psycopg2
19
  import logging
20
  import signal
 
21
 
22
  # --- Setup Logging ---
23
  logging.basicConfig(
@@ -44,7 +44,6 @@ os.makedirs("static/css", exist_ok=True)
44
  os.makedirs("static/js", exist_ok=True)
45
  os.makedirs("dumps", exist_ok=True)
46
 
47
- # Create placeholder files if they don't exist so the app can run
48
  if not os.path.exists("templates/index.html"):
49
  with open("templates/index.html", "w") as f:
50
  f.write("<!DOCTYPE html><html><head><title>Migrator</title></head><body>Loading...</body></html>")
@@ -54,12 +53,11 @@ templates = Jinja2Templates(directory="templates")
54
  app.mount("/static", StaticFiles(directory="static"), name="static")
55
  app.mount("/downloads", StaticFiles(directory="dumps"), name="downloads")
56
 
57
-
58
  # --- Global State for Migration ---
59
  migration_state = {
60
  "id": str(uuid.uuid4()),
61
  "running": False,
62
- "operation": None, # "dump" or "restore"
63
  "start_time": None,
64
  "end_time": None,
65
  "dump_file": None,
@@ -77,277 +75,168 @@ migration_state = {
77
  "size_processed_bytes": 0,
78
  "percent_complete_by_size": 0,
79
  "percent_complete_by_count": 0,
80
- "manifest": None, # Will store the detailed chunk manifest
81
- # Legacy fields for non-TimescaleDB or fallback
82
- "tables_completed": 0,
83
- "total_tables": 0,
84
- "current_size_mb": 0,
85
- "growth_rate_mb_per_sec": 0,
86
  }
87
  }
88
-
89
- # Lock for updating global state
90
  migration_lock = threading.Lock()
91
 
92
- def log_message(message: str, level: str = "info", command: str = None):
93
- """Add timestamped log message with level"""
 
94
  timestamp = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
95
- log_entry = {
96
- "timestamp": timestamp,
97
- "message": message,
98
- "level": level,
99
- "command": command,
100
- "id": len(migration_state["log"])
101
- }
102
  with migration_lock:
103
  migration_state["log"].append(log_entry)
104
  migration_state["last_activity"] = time.time()
105
  logger.info(f"[{level.upper()}] {message}")
106
- if command:
107
- logger.info(f"Command: {command}")
108
 
109
  def test_connection_logic(connection_string: str) -> bool:
110
- """Test a PostgreSQL connection string (internal logic)"""
111
  try:
112
- conn = psycopg2.connect(connection_string)
113
- conn.close()
114
- return True
115
  except Exception as e:
116
- logger.error(f"Connection test failed: {str(e)}")
117
  return False
118
 
119
  def get_chunk_manifest(connection_string: str) -> Dict[str, Any]:
120
- """
121
- Connects to a TimescaleDB and generates a manifest of all hypertable chunks
122
- (both regular and compressed) and their sizes.
123
- """
124
- manifest = {
125
- "total_chunks": 0,
126
- "total_size_bytes": 0,
127
- "chunks": {} # Format: {'schema.chunk_name': size_in_bytes}
128
- }
129
- log_message("Attempting to generate chunk manifest from source database...", "info")
130
  try:
131
- conn = psycopg2.connect(connection_string)
132
- with conn.cursor() as cur:
133
- # This query joins chunks with their respective hypertables to get a full list
134
- # of all chunk tables (including compressed ones) that pg_dump will process.
135
- query = """
136
- SELECT
137
- c.schema_name || '.' || c.table_name as chunk_fqn,
138
- pg_total_relation_size(c.schema_name || '.' || c.table_name) as chunk_size_bytes
139
- FROM _timescaledb_catalog.chunk c;
140
- """
141
- cur.execute(query)
142
- results = cur.fetchall()
143
-
144
- if not results:
145
- log_message("No TimescaleDB chunks found. Proceeding with standard progress tracking.", "warning")
146
- return manifest
147
-
148
- for row in results:
149
- chunk_name, chunk_size = row
150
- if chunk_size is None: continue # Skip if size is null for any reason
151
- manifest["chunks"][chunk_name] = chunk_size
152
- manifest["total_size_bytes"] += chunk_size
153
-
154
- manifest["total_chunks"] = len(manifest["chunks"])
155
- log_message(
156
- f"Successfully generated manifest: {manifest['total_chunks']} chunks found, "
157
- f"total size {round(manifest['total_size_bytes'] / (1024**3), 2)} GB.",
158
- "success"
159
- )
160
- return manifest
161
  except Exception as e:
162
  log_message(f"Failed to generate chunk manifest: {e}. Will use basic progress tracking.", "error")
163
- # Return an empty manifest on failure so the process can continue
164
- return { "total_chunks": 0, "total_size_bytes": 0, "chunks": {} }
165
- finally:
166
- if 'conn' in locals() and conn:
167
- conn.close()
168
-
169
- def run_dump(source_conn: str, file_path: str, options: dict):
170
- """Run pg_dump in a background thread with enhanced progress tracking."""
171
- try:
172
- absolute_file_path = os.path.abspath(file_path)
173
- if os.path.exists(absolute_file_path):
174
- os.remove(absolute_file_path)
175
-
176
- env = os.environ.copy()
177
- format_flag = "-F" + options.get("format", "c")
178
- cmd = ["pg_dump", source_conn, format_flag, "-v", "-f", absolute_file_path]
179
-
180
- if options.get("schema"):
181
- cmd.extend(["-n", options["schema"]])
182
- if options.get("compression") and options["compression"] != "default":
183
- cmd.extend(["-Z", options["compression"]])
 
 
 
 
 
 
 
 
 
 
 
184
 
185
- log_message(f"Starting database dump to {absolute_file_path}", "info", " ".join(cmd))
186
 
187
- with migration_lock:
188
- migration_state["start_time"] = time.time()
189
- migration_state["running"] = True
190
- migration_state["operation"] = "dump"
191
- migration_state["dump_file"] = absolute_file_path
192
- migration_state["dump_completed"] = False
193
-
194
- process = subprocess.Popen(
195
- cmd,
196
- stdout=subprocess.PIPE,
197
- stderr=subprocess.PIPE,
198
- env=env,
199
- text=True,
200
- bufsize=1
201
- )
202
 
 
 
 
203
  with migration_lock:
204
  migration_state["process"] = process
205
 
206
- def read_stderr():
207
- """Reads stderr from pg_dump and updates progress based on the manifest."""
208
- for line in iter(process.stderr.readline, ''):
209
- line = line.strip()
210
- if not line:
211
- continue
212
-
213
- log_message(line, "info") # Log all verbose output
214
-
215
- # Check for table data dumping lines
216
- if "dumping data for table" in line:
217
- try:
218
- # Extract the fully-qualified table/chunk name
219
- parts = line.split(" for table ")[1].split(" ")
220
- table_name_full = parts[0]
221
-
222
- with migration_lock:
223
- migration_state["progress"]["current_table"] = table_name_full
224
- manifest = migration_state["progress"].get("manifest")
225
-
226
- # Use manifest for accurate progress if available
227
- if manifest and manifest.get("chunks"):
228
- chunk_size = manifest["chunks"].get(table_name_full)
229
- if chunk_size is not None: # It's a chunk we're tracking
230
- migration_state["progress"]["chunks_processed"] += 1
231
- migration_state["progress"]["size_processed_bytes"] += chunk_size
232
-
233
- # Recalculate percentages
234
- if migration_state["progress"]["total_chunks"] > 0:
235
- migration_state["progress"]["percent_complete_by_count"] = round(
236
- (migration_state["progress"]["chunks_processed"] / migration_state["progress"]["total_chunks"]) * 100, 2
237
- )
238
- if migration_state["progress"]["total_size_bytes"] > 0:
239
- migration_state["progress"]["percent_complete_by_size"] = round(
240
- (migration_state["progress"]["size_processed_bytes"] / migration_state["progress"]["total_size_bytes"]) * 100, 2
241
- )
242
- else:
243
- # It's a regular table, not a chunk
244
- migration_state["progress"]["tables_completed"] += 1
245
- else:
246
- # Fallback to simple table counting
247
- migration_state["progress"]["tables_completed"] += 1
248
-
249
- except Exception as e:
250
- log_message(f"Error parsing pg_dump output line '{line}': {e}", "warning")
251
-
252
- stderr_thread = threading.Thread(target=read_stderr, daemon=True)
253
  stderr_thread.start()
254
-
255
  exit_code = process.wait()
256
  stderr_thread.join(timeout=2.0)
257
 
258
- final_size = 0
259
- if os.path.exists(absolute_file_path):
260
- final_size = os.path.getsize(absolute_file_path)
261
-
262
  with migration_lock:
263
  migration_state["dump_file_size"] = final_size / (1024 * 1024) # MB
264
  migration_state["end_time"] = time.time()
265
  migration_state["running"] = False
266
  migration_state["process"] = None
267
- if exit_code == 0 and final_size > 0:
268
- migration_state["dump_completed"] = True
 
 
 
 
 
 
 
 
 
269
  total_time = migration_state["end_time"] - migration_state["start_time"]
270
- log_message(
271
- f"Database dump completed successfully. Size: {migration_state['dump_file_size']:.2f} MB. Time: {total_time:.2f} seconds",
272
- "success"
273
- )
274
  else:
275
- migration_state["dump_completed"] = False
276
- if exit_code != 0:
277
- log_message(f"Database dump failed with exit code {exit_code}", "error")
278
- elif final_size == 0:
279
- log_message(f"Dump file was created but is empty: {absolute_file_path}", "error")
280
 
281
  except Exception as e:
282
- log_message(f"An unexpected error occurred during database dump: {str(e)}", "error")
283
  with migration_lock:
284
  migration_state["running"] = False
285
  migration_state["process"] = None
286
 
287
  def run_restore(target_conn: str, file_path: str, options: dict):
288
- """Run pg_restore in a background thread."""
289
  try:
290
- if not os.path.exists(file_path):
291
- log_message(f"Dump file not found: {file_path}", "error")
292
- return
293
-
294
- env = os.environ.copy()
295
-
296
  # Pre-restore step
297
  if options.get("timescaledb_pre_restore", True):
298
  pre_restore_cmd = ["psql", target_conn, "-c", "SELECT timescaledb_pre_restore();"]
299
- log_message("Running timescaledb_pre_restore()", "info", " ".join(pre_restore_cmd))
300
- pre_restore_proc = subprocess.run(pre_restore_cmd, capture_output=True, text=True, env=env)
301
  if pre_restore_proc.returncode != 0:
302
  log_message(f"Pre-restore failed: {pre_restore_proc.stderr or pre_restore_proc.stdout}", "error")
303
  return
304
 
305
- # Build pg_restore command
306
  cmd = ["pg_restore", "-d", target_conn, "-v"]
307
  if options.get("no_owner", True): cmd.append("--no-owner")
308
  if options.get("clean", False): cmd.append("--clean")
309
- # The single-transaction option is removed as per user request.
310
- # if options.get("single_transaction", True): cmd.append("--single-transaction")
311
  cmd.append(file_path)
312
 
313
- log_message(f"Starting database restore from {file_path}", "info", " ".join(cmd))
314
-
315
- with migration_lock:
316
- migration_state["start_time"] = time.time()
317
- migration_state["running"] = True
318
- migration_state["operation"] = "restore"
319
- migration_state["restore_completed"] = False
320
-
321
- process = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, env=env, text=True, bufsize=1)
322
  with migration_lock:
323
  migration_state["process"] = process
324
 
325
- # For restore, we use the simpler table counting as we don't have a pre-restore manifest
326
- def read_stderr():
327
- for line in iter(process.stderr.readline, ''):
328
- line = line.strip()
329
- if not line: continue
330
- log_message(line, "info")
331
- if "processing" in line.lower() and "table data" in line.lower():
332
- try:
333
- table_name = line.split("table data ")[1].split(" ")[0]
334
- with migration_lock:
335
- migration_state["progress"]["current_table"] = table_name
336
- migration_state["progress"]["tables_completed"] += 1
337
- except Exception as e:
338
- logger.warning(f"Could not parse table name from restore line: {line}, Error: {e}")
339
-
340
- stderr_thread = threading.Thread(target=read_stderr, daemon=True)
341
  stderr_thread.start()
342
-
343
  exit_code = process.wait()
344
  stderr_thread.join(timeout=2.0)
345
 
346
  post_restore_success = True
347
  if exit_code == 0 and options.get("timescaledb_post_restore", True):
348
  post_restore_cmd = ["psql", target_conn, "-c", "SELECT timescaledb_post_restore(); ANALYZE;"]
349
- log_message("Running timescaledb_post_restore() and ANALYZE", "info", " ".join(post_restore_cmd))
350
- post_restore_proc = subprocess.run(post_restore_cmd, capture_output=True, text=True, env=env)
351
  if post_restore_proc.returncode != 0:
352
  log_message(f"Post-restore failed: {post_restore_proc.stderr or post_restore_proc.stdout}", "error")
353
  post_restore_success = False
@@ -356,285 +245,219 @@ def run_restore(target_conn: str, file_path: str, options: dict):
356
  migration_state["end_time"] = time.time()
357
  migration_state["running"] = False
358
  migration_state["process"] = None
359
- if exit_code == 0 and post_restore_success:
360
- migration_state["restore_completed"] = True
 
361
  total_time = migration_state["end_time"] - migration_state["start_time"]
362
- log_message(f"Database restore completed successfully. Time: {total_time:.2f} seconds", "success")
363
  else:
364
- migration_state["restore_completed"] = False
365
- if exit_code != 0:
366
- log_message(f"Database restore failed with exit code {exit_code}", "error")
367
 
368
  except Exception as e:
369
- log_message(f"An unexpected error occurred during database restore: {e}", "error")
370
  with migration_lock:
371
  migration_state["running"] = False
372
  migration_state["process"] = None
373
 
374
  def stop_current_process():
375
- """Stop the current running process forcefully."""
376
  with migration_lock:
377
  process = migration_state.get("process")
378
  if not process or not migration_state.get("running"):
379
- log_message("Stop command received, but no process was running.", "info")
380
- if migration_state["running"]: # Correct inconsistent state
381
- migration_state["running"] = False
382
  return False
383
-
384
  try:
385
  pid = process.pid
386
- operation = migration_state["operation"]
387
- log_message(f"Attempting to stop {operation} process (PID: {pid})...", "warning")
388
-
389
- if process.poll() is not None:
390
- log_message(f"Process (PID: {pid}) already terminated.", "info")
391
  else:
392
- # Use os.killpg to terminate the entire process group, which is more reliable
393
- # for stopping child processes spawned by shell commands.
394
- if hasattr(os, 'killpg') and hasattr(os, 'getpgid'):
395
- try:
396
- os.killpg(os.getpgid(pid), signal.SIGKILL)
397
- log_message(f"Sent SIGKILL to process group of PID {pid}.", "warning")
398
- except ProcessLookupError:
399
- log_message(f"Process group for PID {pid} not found, it likely terminated.", "info")
400
- except Exception as e:
401
- log_message(f"Failed to kill process group for PID {pid}: {e}. Falling back to process.kill().", "error")
402
- process.kill() # Fallback for other errors or non-Unix systems
403
- else:
404
- process.kill()
405
- log_message(f"Sent SIGKILL directly to process (PID: {pid}).", "warning")
406
-
407
- # Wait a moment for the OS to process the kill signal
408
- process.wait(timeout=5)
409
-
410
- log_message(f"Database {operation} operation stopped by user.", "warning")
411
  return True
412
- except (ProcessLookupError, subprocess.TimeoutExpired):
413
- log_message(f"Process was already gone or did not terminate in time.", "info")
414
- return True # Consider it a success as the process is no longer running
415
  except Exception as e:
416
  log_message(f"Error stopping process: {e}", "error")
417
  return False
418
  finally:
419
- # Always update state to reflect the stop action
420
  migration_state["running"] = False
421
  migration_state["process"] = None
422
  migration_state["end_time"] = time.time()
423
 
424
-
425
  # --- API Endpoints ---
426
 
427
  @app.get("/", response_class=HTMLResponse)
428
  async def home(request: Request):
429
- """Serve the main HTML file."""
430
  return templates.TemplateResponse("index.html", {"request": request})
431
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
432
  @app.post("/start-dump")
433
  async def start_dump_endpoint(data: Dict[str, Any], background_tasks: BackgroundTasks):
434
- """Start a database dump process after generating a chunk manifest."""
435
  source_conn = data.get("source_conn")
436
  options = data.get("options", {})
437
- if not source_conn:
438
- raise HTTPException(status_code=400, detail="Source connection string is required")
439
- if not test_connection_logic(source_conn):
440
- raise HTTPException(status_code=400, detail="Source connection failed. Cannot start dump.")
441
-
442
  if migration_state["running"]:
443
- log_message("Stop command issued to clear running process for new dump.", "warning")
444
  stop_current_process()
445
- time.sleep(1) # Give a moment for the process to die
446
-
447
- # Generate the detailed chunk manifest before starting
448
- manifest = get_chunk_manifest(source_conn)
449
 
450
- # Setup file path
451
- filename = options.get("filename", "timescale_backup").strip().replace(" ", "_")
452
- filename = "".join(c for c in filename if c.isalnum() or c in ('_','-'))
453
- if not filename: filename = "timescale_backup"
454
-
455
- format_flag = options.get("format", "c")
456
- extension = ".dump"
457
- if format_flag == "p": extension = ".sql"
458
- elif format_flag == "d": extension = ""
459
- elif format_flag == "t": extension = ".tar"
460
 
461
- dumps_dir = Path("dumps").resolve()
462
- file_path = dumps_dir / f"{filename}{extension}"
463
- if not str(file_path).startswith(str(dumps_dir)):
464
- raise HTTPException(status_code=400, detail="Invalid filename specified.")
465
 
466
- # Reset state with manifest data
467
  with migration_lock:
468
- migration_state["id"] = str(uuid.uuid4())
469
- migration_state["operation"] = "dump"
470
- migration_state["dump_file"] = str(file_path)
471
- migration_state["log"] = [] # Clear logs for new operation
472
- migration_state["progress"] = {
473
- "current_table": None,
474
- "total_chunks": manifest["total_chunks"],
475
- "chunks_processed": 0,
476
- "total_size_bytes": manifest["total_size_bytes"],
477
- "size_processed_bytes": 0,
478
- "percent_complete_by_size": 0,
479
- "percent_complete_by_count": 0,
480
- "manifest": manifest,
481
- "tables_completed": 0,
482
- "total_tables": 0,
483
- "current_size_mb": 0,
484
- "growth_rate_mb_per_sec": 0,
485
- }
486
- # Reset other state variables
487
- migration_state["running"] = False
488
- migration_state["start_time"] = None
489
- migration_state["end_time"] = None
490
- migration_state["dump_completed"] = False
491
- migration_state["restore_completed"] = False
492
-
493
- background_tasks.add_task(run_dump, source_conn, str(file_path), options)
494
- return JSONResponse(content={"success": True, "message": "Dump process initiated"})
495
 
496
  @app.post("/start-restore")
497
  async def start_restore_endpoint(data: Dict[str, Any], background_tasks: BackgroundTasks):
498
- """Start a database restore process."""
499
  target_conn = data.get("target_conn")
500
  dump_file = data.get("dump_file")
501
  options = data.get("options", {})
502
- if not target_conn or not dump_file:
503
- raise HTTPException(status_code=400, detail="Target connection and dump file are required.")
504
- if not test_connection_logic(target_conn):
505
- raise HTTPException(status_code=400, detail="Target connection failed. Cannot start restore.")
506
-
507
- dumps_dir = Path("dumps").resolve()
508
- dump_file_path = Path(dump_file).resolve()
509
- if not dump_file_path.exists() or not str(dump_file_path).startswith(str(dumps_dir)):
510
- raise HTTPException(status_code=400, detail="Invalid or non-existent dump file selected.")
511
-
512
  if migration_state["running"]:
513
- log_message("Stop command issued to clear running process for new restore.", "warning")
514
  stop_current_process()
515
  time.sleep(1)
516
 
517
- # Reset state for restore operation
 
 
 
 
 
 
 
 
 
 
518
  with migration_lock:
519
- migration_state["id"] = str(uuid.uuid4())
520
- migration_state["operation"] = "restore"
521
- migration_state["log"] = [] # Clear logs
522
- migration_state["progress"] = { # Reset to defaults for restore
523
- "current_table": None, "total_chunks": 0, "chunks_processed": 0,
524
- "total_size_bytes": 0, "size_processed_bytes": 0, "percent_complete_by_size": 0,
525
- "percent_complete_by_count": 0, "manifest": None, "tables_completed": 0,
526
- "total_tables": 0, "current_size_mb": 0, "growth_rate_mb_per_sec": 0,
527
- }
528
- migration_state["running"] = False
529
- migration_state["start_time"] = None
530
- migration_state["end_time"] = None
531
- migration_state["dump_completed"] = False
532
- migration_state["restore_completed"] = False
533
-
534
- background_tasks.add_task(run_restore, target_conn, str(dump_file_path), options)
535
- return JSONResponse(content={"success": True, "message": "Restore process initiated"})
536
 
537
  @app.post("/stop-process")
538
  async def stop_process_endpoint():
539
- """Stop the current database process."""
540
- stopped = stop_current_process()
541
- if stopped:
542
- return JSONResponse(content={"success": True, "message": "Process stop initiated."})
543
- else:
544
- # Check if it wasn't running or if stop failed
545
- with migration_lock:
546
- is_running = migration_state["running"]
547
- if not is_running:
548
- return JSONResponse(content={"success": False, "message": "No process was running to stop."})
549
- else:
550
- return JSONResponse(status_code=500, content={"success": False, "message": "Failed to stop the process. Check logs."})
551
 
552
  @app.get("/status")
553
  async def get_status():
554
- """Get the current migration status."""
555
  with migration_lock:
556
- # Create a deep copy to avoid race conditions and to remove non-serializable objects
557
  state_copy = {k: v for k, v in migration_state.items() if k != "process"}
558
- # Don't send the full manifest back in every status update to save bandwidth
559
  if "progress" in state_copy and "manifest" in state_copy["progress"]:
560
- state_copy["progress"]["manifest"] = None
561
  return JSONResponse(content=state_copy)
562
 
563
  @app.get("/list-dumps")
564
  async def list_dumps():
565
- """List available dump files."""
566
- try:
567
- dumps_dir = Path("dumps")
568
- dump_files = []
569
- for f in dumps_dir.iterdir():
570
- if f.is_file() or f.is_dir():
571
- try:
572
- stat = f.stat()
573
- size = sum(file.stat().st_size for file in f.rglob('*')) if f.is_dir() else stat.st_size
574
- dump_files.append({
575
- "name": f.name,
576
- "path": str(f),
577
- "size_mb": size / (1024 * 1024),
578
- "date": datetime.datetime.fromtimestamp(stat.st_mtime).strftime("%Y-%m-%d %H:%M:%S"),
579
- "is_dir": f.is_dir()
580
- })
581
- except OSError as e:
582
- logger.error(f"Could not stat file/dir {f.name}: {e}")
583
- dump_files.sort(key=lambda x: x["date"], reverse=True)
584
- return JSONResponse(content={"success": True, "dumps": dump_files})
585
- except Exception as e:
586
- logger.error(f"Failed to list dumps: {e}")
587
- raise HTTPException(status_code=500, detail=f"Failed to list dumps: {e}")
588
 
589
  @app.get("/downloads/{file_name:path}")
590
  async def download_file(file_name: str):
591
- """Download a dump file."""
592
- dumps_dir = Path("dumps").resolve()
593
- file_path = (dumps_dir / file_name).resolve()
594
- if not file_path.exists() or not str(file_path).startswith(str(dumps_dir)):
595
- raise HTTPException(status_code=404, detail="File not found or invalid path")
596
- if file_path.is_dir():
597
- raise HTTPException(status_code=400, detail="Directory downloads are not supported.")
598
- return FileResponse(path=str(file_path), filename=file_name, media_type="application/octet-stream")
599
-
600
- # --- Other Endpoints (Test Connection, etc.) ---
601
- # These are largely unchanged but included for completeness.
602
 
603
  @app.post("/test-connection")
604
  async def test_connection_endpoint(data: Dict[str, str]):
605
- """Test a database connection and get basic info"""
606
  connection_string = data.get("connection_string")
607
- if not connection_string:
608
- raise HTTPException(status_code=400, detail="Connection string is required")
609
- if not test_connection_logic(connection_string):
610
- return JSONResponse(content={"success": False, "message": "Failed to connect to database"})
611
-
612
  try:
613
- conn = psycopg2.connect(connection_string)
614
- with conn.cursor() as cur:
615
- cur.execute("SELECT version()")
616
- version = cur.fetchone()[0]
617
- cur.execute("SELECT current_database()")
618
- database = cur.fetchone()[0]
619
-
620
- is_timescaledb, ts_version = False, None
621
- cur.execute("SELECT EXISTS (SELECT 1 FROM pg_extension WHERE extname = 'timescaledb');")
622
- if cur.fetchone()[0]:
623
- is_timescaledb = True
624
- cur.execute("SELECT extversion FROM pg_extension WHERE extname = 'timescaledb'")
625
- ts_version = cur.fetchone()[0]
626
-
627
- log_message(f"Successful connection test to database: {database}", "success")
628
- return JSONResponse(content={
629
  "success": True, "version": version, "is_timescaledb": is_timescaledb,
630
  "timescaledb_version": ts_version, "database": database
631
  })
632
  except Exception as e:
633
- log_message(f"Connection test failed during info fetch: {e}", "error")
634
  raise HTTPException(status_code=500, detail=f"Database error: {e}")
635
- finally:
636
- if 'conn' in locals() and conn:
637
- conn.close()
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
638
 
639
  if __name__ == "__main__":
640
  import uvicorn
 
12
  import json
13
  import datetime
14
  import uuid
15
+ from typing import Dict, Any
 
16
  from pathlib import Path
17
  import psycopg2
18
  import logging
19
  import signal
20
+ import re
21
 
22
  # --- Setup Logging ---
23
  logging.basicConfig(
 
44
  os.makedirs("static/js", exist_ok=True)
45
  os.makedirs("dumps", exist_ok=True)
46
 
 
47
  if not os.path.exists("templates/index.html"):
48
  with open("templates/index.html", "w") as f:
49
  f.write("<!DOCTYPE html><html><head><title>Migrator</title></head><body>Loading...</body></html>")
 
53
  app.mount("/static", StaticFiles(directory="static"), name="static")
54
  app.mount("/downloads", StaticFiles(directory="dumps"), name="downloads")
55
 
 
56
  # --- Global State for Migration ---
57
  migration_state = {
58
  "id": str(uuid.uuid4()),
59
  "running": False,
60
+ "operation": None,
61
  "start_time": None,
62
  "end_time": None,
63
  "dump_file": None,
 
75
  "size_processed_bytes": 0,
76
  "percent_complete_by_size": 0,
77
  "percent_complete_by_count": 0,
78
+ "manifest_loaded": False,
79
+ "manifest": None, # Full manifest only stored during operation, not sent via status
80
+ "tables_completed": 0, # Fallback for non-manifest restores
 
 
 
81
  }
82
  }
 
 
83
  migration_lock = threading.Lock()
84
 
85
+ # --- Helper Functions ---
86
+
87
+ def log_message(message: str, level: str = "info"):
88
  timestamp = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
89
+ log_entry = {"timestamp": timestamp, "message": message, "level": level, "id": len(migration_state.get("log", []))}
 
 
 
 
 
 
90
  with migration_lock:
91
  migration_state["log"].append(log_entry)
92
  migration_state["last_activity"] = time.time()
93
  logger.info(f"[{level.upper()}] {message}")
 
 
94
 
95
  def test_connection_logic(connection_string: str) -> bool:
 
96
  try:
97
+ with psycopg2.connect(connection_string) as conn:
98
+ return True
 
99
  except Exception as e:
100
+ logger.error(f"Connection test failed: {e}")
101
  return False
102
 
103
  def get_chunk_manifest(connection_string: str) -> Dict[str, Any]:
104
+ manifest = {"total_chunks": 0, "total_size_bytes": 0, "chunks": {}}
105
+ log_message("Generating chunk manifest from source database...")
 
 
 
 
 
 
 
 
106
  try:
107
+ with psycopg2.connect(connection_string) as conn:
108
+ with conn.cursor() as cur:
109
+ query = """
110
+ SELECT c.schema_name || '.' || c.table_name as chunk_fqn,
111
+ pg_total_relation_size(c.schema_name || '.' || c.table_name) as chunk_size_bytes
112
+ FROM _timescaledb_catalog.chunk c;
113
+ """
114
+ cur.execute(query)
115
+ for row in cur.fetchall():
116
+ chunk_name, chunk_size = row
117
+ if chunk_size is not None:
118
+ manifest["chunks"][chunk_name] = chunk_size
119
+ manifest["total_size_bytes"] += chunk_size
120
+ manifest["total_chunks"] = len(manifest["chunks"])
121
+ if manifest["total_chunks"] > 0:
122
+ log_message(f"Manifest generated: {manifest['total_chunks']} chunks, total size {round(manifest['total_size_bytes'] / (1024**3), 2)} GB.", "success")
123
+ else:
124
+ log_message("No TimescaleDB chunks found. Will use standard progress tracking.", "warning")
125
+ return manifest
 
 
 
 
 
 
 
 
 
 
 
126
  except Exception as e:
127
  log_message(f"Failed to generate chunk manifest: {e}. Will use basic progress tracking.", "error")
128
+ return {"total_chunks": 0, "total_size_bytes": 0, "chunks": {}}
129
+
130
+ def process_stderr(process, operation):
131
+ """Generic stderr reader for dump and restore, supports manifest-based progress."""
132
+ for line in iter(process.stderr.readline, ''):
133
+ line = line.strip()
134
+ if not line: continue
135
+ log_message(line)
136
+
137
+ # Regex to handle various pg_dump/pg_restore verbose outputs for tables/chunks
138
+ match = re.search(r'(?:table|data for table|processing|from table)\s+"?([^".\s]+)"?\.?"?([^".\s]+)"?', line)
139
+ if match:
140
+ schema, table = match.groups()
141
+ table_name_full = f"{schema}.{table}"
142
+
143
+ with migration_lock:
144
+ migration_state["progress"]["current_table"] = table_name_full
145
+ manifest = migration_state["progress"].get("manifest")
146
+
147
+ if manifest and manifest.get("chunks"):
148
+ chunk_size = manifest["chunks"].get(table_name_full)
149
+ if chunk_size is not None:
150
+ migration_state["progress"]["chunks_processed"] += 1
151
+ migration_state["progress"]["size_processed_bytes"] += chunk_size
152
+ if migration_state["progress"]["total_chunks"] > 0:
153
+ migration_state["progress"]["percent_complete_by_count"] = round((migration_state["progress"]["chunks_processed"] / migration_state["progress"]["total_chunks"]) * 100, 2)
154
+ if migration_state["progress"]["total_size_bytes"] > 0:
155
+ migration_state["progress"]["percent_complete_by_size"] = round((migration_state["progress"]["size_processed_bytes"] / migration_state["progress"]["total_size_bytes"]) * 100, 2)
156
+ else:
157
+ migration_state["progress"]["tables_completed"] += 1 # A non-chunk table
158
+ else:
159
+ migration_state["progress"]["tables_completed"] += 1 # Fallback mode
160
 
161
+ # --- Core Background Tasks ---
162
 
163
+ def run_dump(source_conn: str, file_path: str, manifest_path: str, manifest_data: dict, options: dict):
164
+ try:
165
+ cmd = ["pg_dump", source_conn, f"-F{options.get('format', 'c')}", "-v", "-f", file_path]
166
+ if options.get("schema"): cmd.extend(["-n", options["schema"]])
167
+ if options.get("compression", "default") != "default": cmd.extend(["-Z", options["compression"]])
 
 
 
 
 
 
 
 
 
 
168
 
169
+ log_message(f"Starting database dump...", "info")
170
+
171
+ process = subprocess.Popen(cmd, stderr=subprocess.PIPE, text=True, bufsize=1)
172
  with migration_lock:
173
  migration_state["process"] = process
174
 
175
+ stderr_thread = threading.Thread(target=process_stderr, args=(process, "dump"), daemon=True)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
176
  stderr_thread.start()
 
177
  exit_code = process.wait()
178
  stderr_thread.join(timeout=2.0)
179
 
180
+ final_size = os.path.getsize(file_path) if os.path.exists(file_path) else 0
181
+ success = exit_code == 0 and final_size > 0
182
+
 
183
  with migration_lock:
184
  migration_state["dump_file_size"] = final_size / (1024 * 1024) # MB
185
  migration_state["end_time"] = time.time()
186
  migration_state["running"] = False
187
  migration_state["process"] = None
188
+ migration_state["dump_completed"] = success
189
+
190
+ if success:
191
+ # Write manifest file on successful dump
192
+ try:
193
+ with open(manifest_path, 'w') as f:
194
+ json.dump(manifest_data, f)
195
+ log_message(f"Successfully saved progress manifest to {manifest_path}", "success")
196
+ except Exception as e:
197
+ log_message(f"Could not save progress manifest: {e}", "error")
198
+
199
  total_time = migration_state["end_time"] - migration_state["start_time"]
200
+ log_message(f"Dump completed successfully. Size: {migration_state['dump_file_size']:.2f} MB. Time: {total_time:.2f}s", "success")
 
 
 
201
  else:
202
+ log_message(f"Dump failed. Exit code: {exit_code}, File size: {final_size} bytes.", "error")
 
 
 
 
203
 
204
  except Exception as e:
205
+ log_message(f"An unexpected error occurred during dump: {e}", "error")
206
  with migration_lock:
207
  migration_state["running"] = False
208
  migration_state["process"] = None
209
 
210
  def run_restore(target_conn: str, file_path: str, options: dict):
 
211
  try:
 
 
 
 
 
 
212
  # Pre-restore step
213
  if options.get("timescaledb_pre_restore", True):
214
  pre_restore_cmd = ["psql", target_conn, "-c", "SELECT timescaledb_pre_restore();"]
215
+ pre_restore_proc = subprocess.run(pre_restore_cmd, capture_output=True, text=True)
 
216
  if pre_restore_proc.returncode != 0:
217
  log_message(f"Pre-restore failed: {pre_restore_proc.stderr or pre_restore_proc.stdout}", "error")
218
  return
219
 
 
220
  cmd = ["pg_restore", "-d", target_conn, "-v"]
221
  if options.get("no_owner", True): cmd.append("--no-owner")
222
  if options.get("clean", False): cmd.append("--clean")
 
 
223
  cmd.append(file_path)
224
 
225
+ log_message(f"Starting database restore...", "info")
226
+
227
+ process = subprocess.Popen(cmd, stderr=subprocess.PIPE, text=True, bufsize=1)
 
 
 
 
 
 
228
  with migration_lock:
229
  migration_state["process"] = process
230
 
231
+ stderr_thread = threading.Thread(target=process_stderr, args=(process, "restore"), daemon=True)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
232
  stderr_thread.start()
 
233
  exit_code = process.wait()
234
  stderr_thread.join(timeout=2.0)
235
 
236
  post_restore_success = True
237
  if exit_code == 0 and options.get("timescaledb_post_restore", True):
238
  post_restore_cmd = ["psql", target_conn, "-c", "SELECT timescaledb_post_restore(); ANALYZE;"]
239
+ post_restore_proc = subprocess.run(post_restore_cmd, capture_output=True, text=True)
 
240
  if post_restore_proc.returncode != 0:
241
  log_message(f"Post-restore failed: {post_restore_proc.stderr or post_restore_proc.stdout}", "error")
242
  post_restore_success = False
 
245
  migration_state["end_time"] = time.time()
246
  migration_state["running"] = False
247
  migration_state["process"] = None
248
+ migration_state["restore_completed"] = (exit_code == 0 and post_restore_success)
249
+
250
+ if migration_state["restore_completed"]:
251
  total_time = migration_state["end_time"] - migration_state["start_time"]
252
+ log_message(f"Restore completed successfully. Time: {total_time:.2f}s", "success")
253
  else:
254
+ log_message(f"Restore failed. Exit code: {exit_code}, Post-restore success: {post_restore_success}", "error")
 
 
255
 
256
  except Exception as e:
257
+ log_message(f"An unexpected error occurred during restore: {e}", "error")
258
  with migration_lock:
259
  migration_state["running"] = False
260
  migration_state["process"] = None
261
 
262
  def stop_current_process():
 
263
  with migration_lock:
264
  process = migration_state.get("process")
265
  if not process or not migration_state.get("running"):
 
 
 
266
  return False
 
267
  try:
268
  pid = process.pid
269
+ log_message(f"Attempting to stop {migration_state['operation']} process (PID: {pid})...", "warning")
270
+ if hasattr(os, 'killpg') and hasattr(os, 'getpgid'):
271
+ os.killpg(os.getpgid(pid), signal.SIGKILL)
 
 
272
  else:
273
+ process.kill()
274
+ process.wait(timeout=5)
275
+ log_message("Process stopped by user.", "warning")
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
276
  return True
 
 
 
277
  except Exception as e:
278
  log_message(f"Error stopping process: {e}", "error")
279
  return False
280
  finally:
 
281
  migration_state["running"] = False
282
  migration_state["process"] = None
283
  migration_state["end_time"] = time.time()
284
 
 
285
  # --- API Endpoints ---
286
 
287
  @app.get("/", response_class=HTMLResponse)
288
  async def home(request: Request):
 
289
  return templates.TemplateResponse("index.html", {"request": request})
290
 
291
+ def reset_state(operation: str):
292
+ """Resets the global state for a new operation."""
293
+ with migration_lock:
294
+ migration_state.update({
295
+ "id": str(uuid.uuid4()), "running": False, "operation": operation,
296
+ "start_time": None, "end_time": None, "dump_file": None, "dump_file_size": 0,
297
+ "dump_completed": False, "restore_completed": False, "last_activity": time.time(),
298
+ "log": [], "process": None,
299
+ "progress": {
300
+ "current_table": None, "total_chunks": 0, "chunks_processed": 0,
301
+ "total_size_bytes": 0, "size_processed_bytes": 0, "percent_complete_by_size": 0,
302
+ "percent_complete_by_count": 0, "manifest_loaded": False, "manifest": None,
303
+ "tables_completed": 0,
304
+ }
305
+ })
306
+
307
+ def redact_password(conn_str: str) -> str:
308
+ return re.sub(r':([^@]+)@', r':********@', conn_str)
309
+
310
  @app.post("/start-dump")
311
  async def start_dump_endpoint(data: Dict[str, Any], background_tasks: BackgroundTasks):
 
312
  source_conn = data.get("source_conn")
313
  options = data.get("options", {})
314
+ if not source_conn or not test_connection_logic(source_conn):
315
+ raise HTTPException(status_code=400, detail="Valid source connection string is required.")
 
 
 
316
  if migration_state["running"]:
 
317
  stop_current_process()
318
+ time.sleep(1)
 
 
 
319
 
320
+ reset_state("dump")
321
+ manifest_data = get_chunk_manifest(source_conn)
 
 
 
 
 
 
 
 
322
 
323
+ filename = "".join(c for c in options.get("filename", "timescale_backup") if c.isalnum() or c in ('_','-'))
324
+ ext = {"p": ".sql", "d": "", "t": ".tar"}.get(options.get("format", "c"), ".dump")
325
+ dump_path = Path("dumps").resolve() / f"{filename}{ext}"
326
+ manifest_path = Path("dumps").resolve() / f"{filename}{ext}.manifest.json"
327
 
 
328
  with migration_lock:
329
+ migration_state["dump_file"] = str(dump_path)
330
+ migration_state["progress"]["manifest"] = manifest_data
331
+ migration_state["progress"]["total_chunks"] = manifest_data["total_chunks"]
332
+ migration_state["progress"]["total_size_bytes"] = manifest_data["total_size_bytes"]
333
+ migration_state["progress"]["manifest_loaded"] = manifest_data["total_chunks"] > 0
334
+ migration_state["start_time"] = time.time()
335
+ migration_state["running"] = True
336
+
337
+ background_tasks.add_task(run_dump, source_conn, str(dump_path), str(manifest_path), manifest_data, options)
338
+ return JSONResponse({"success": True, "message": "Dump process initiated."})
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
339
 
340
  @app.post("/start-restore")
341
  async def start_restore_endpoint(data: Dict[str, Any], background_tasks: BackgroundTasks):
 
342
  target_conn = data.get("target_conn")
343
  dump_file = data.get("dump_file")
344
  options = data.get("options", {})
345
+ if not target_conn or not dump_file or not test_connection_logic(target_conn):
346
+ raise HTTPException(status_code=400, detail="Valid target connection and dump file are required.")
347
+
348
+ dump_path = Path(dump_file).resolve()
349
+ if not dump_path.exists() or not str(dump_path).startswith(str(Path("dumps").resolve())):
350
+ raise HTTPException(status_code=400, detail="Invalid or non-existent dump file.")
351
+
 
 
 
352
  if migration_state["running"]:
 
353
  stop_current_process()
354
  time.sleep(1)
355
 
356
+ reset_state("restore")
357
+ manifest_path = Path(f"{dump_file}.manifest.json")
358
+ manifest_data = None
359
+ if manifest_path.exists():
360
+ try:
361
+ with open(manifest_path, 'r') as f:
362
+ manifest_data = json.load(f)
363
+ log_message("Successfully loaded progress manifest for restore.", "success")
364
+ except Exception as e:
365
+ log_message(f"Could not load manifest file: {e}", "error")
366
+
367
  with migration_lock:
368
+ if manifest_data:
369
+ migration_state["progress"]["manifest"] = manifest_data
370
+ migration_state["progress"]["total_chunks"] = manifest_data["total_chunks"]
371
+ migration_state["progress"]["total_size_bytes"] = manifest_data["total_size_bytes"]
372
+ migration_state["progress"]["manifest_loaded"] = True
373
+ migration_state["start_time"] = time.time()
374
+ migration_state["running"] = True
375
+
376
+ background_tasks.add_task(run_restore, target_conn, str(dump_path), options)
377
+ return JSONResponse({"success": True, "message": "Restore process initiated."})
 
 
 
 
 
 
 
378
 
379
  @app.post("/stop-process")
380
  async def stop_process_endpoint():
381
+ if stop_current_process():
382
+ return JSONResponse({"success": True, "message": "Process stop initiated."})
383
+ return JSONResponse({"success": False, "message": "No process was running to stop."})
 
 
 
 
 
 
 
 
 
384
 
385
  @app.get("/status")
386
  async def get_status():
 
387
  with migration_lock:
 
388
  state_copy = {k: v for k, v in migration_state.items() if k != "process"}
 
389
  if "progress" in state_copy and "manifest" in state_copy["progress"]:
390
+ state_copy["progress"]["manifest"] = None # Avoid sending large object
391
  return JSONResponse(content=state_copy)
392
 
393
  @app.get("/list-dumps")
394
  async def list_dumps():
395
+ dumps_dir = Path("dumps")
396
+ dump_files = []
397
+ for f in dumps_dir.iterdir():
398
+ if f.is_file() and not f.name.endswith('.manifest.json'):
399
+ try:
400
+ stat = f.stat()
401
+ dump_files.append({
402
+ "name": f.name, "path": str(f), "size_mb": stat.st_size / (1024*1024),
403
+ "date": datetime.datetime.fromtimestamp(stat.st_mtime).strftime("%Y-%m-%d %H:%M:%S")
404
+ })
405
+ except OSError as e:
406
+ logger.error(f"Could not stat file {f.name}: {e}")
407
+ dump_files.sort(key=lambda x: x["date"], reverse=True)
408
+ return JSONResponse({"success": True, "dumps": dump_files})
 
 
 
 
 
 
 
 
 
409
 
410
  @app.get("/downloads/{file_name:path}")
411
  async def download_file(file_name: str):
412
+ file_path = (Path("dumps").resolve() / file_name).resolve()
413
+ if not file_path.exists() or not str(file_path).startswith(str(Path("dumps").resolve())):
414
+ raise HTTPException(status_code=404, detail="File not found")
415
+ return FileResponse(path=str(file_path), filename=file_name)
 
 
 
 
 
 
 
416
 
417
  @app.post("/test-connection")
418
  async def test_connection_endpoint(data: Dict[str, str]):
 
419
  connection_string = data.get("connection_string")
420
+ if not connection_string or not test_connection_logic(connection_string):
421
+ raise HTTPException(status_code=400, detail="Failed to connect to database")
 
 
 
422
  try:
423
+ with psycopg2.connect(connection_string) as conn:
424
+ with conn.cursor() as cur:
425
+ cur.execute("SELECT version()")
426
+ version = cur.fetchone()[0]
427
+ cur.execute("SELECT current_database()")
428
+ database = cur.fetchone()[0]
429
+ is_timescaledb, ts_version = False, None
430
+ cur.execute("SELECT EXISTS (SELECT 1 FROM pg_extension WHERE extname = 'timescaledb');")
431
+ if cur.fetchone()[0]:
432
+ is_timescaledb = True
433
+ cur.execute("SELECT extversion FROM pg_extension WHERE extname = 'timescaledb'")
434
+ ts_version = cur.fetchone()[0]
435
+ return JSONResponse({
 
 
 
436
  "success": True, "version": version, "is_timescaledb": is_timescaledb,
437
  "timescaledb_version": ts_version, "database": database
438
  })
439
  except Exception as e:
 
440
  raise HTTPException(status_code=500, detail=f"Database error: {e}")
441
+
442
+ @app.post("/database-info")
443
+ async def get_database_info(data: Dict[str, str]):
444
+ connection_string = data.get("connection_string")
445
+ if not connection_string or not test_connection_logic(connection_string):
446
+ raise HTTPException(status_code=400, detail="Connection failed")
447
+ try:
448
+ with psycopg2.connect(connection_string) as conn:
449
+ with conn.cursor() as cur:
450
+ cur.execute("""
451
+ SELECT count(*) FROM information_schema.tables
452
+ WHERE table_schema NOT IN ('pg_catalog', 'information_schema', '_timescaledb_internal', '_timescaledb_catalog', '_timescaledb_cache')
453
+ AND table_type = 'BASE TABLE'
454
+ """)
455
+ table_count = cur.fetchone()[0]
456
+ cur.execute("SELECT pg_size_pretty(pg_database_size(current_database()))")
457
+ db_size = cur.fetchone()[0]
458
+ return JSONResponse({"success": True, "table_count": table_count, "database_size": db_size})
459
+ except Exception as e:
460
+ raise HTTPException(status_code=500, detail=f"Database query error: {e}")
461
 
462
  if __name__ == "__main__":
463
  import uvicorn