dippoo Claude Opus 4.6 commited on
Commit
b4ceba9
·
1 Parent(s): 3aa914c

Run training in detached nohup process (survives SSH/server disconnect)

Browse files

Training now runs via nohup with output to /tmp/training.log.
Monitor reads log file instead of SSH channel stdout.
Safe against HF Space rebuilds, SSH drops, and browser closes.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

src/content_engine/services/runpod_trainer.py CHANGED
@@ -589,50 +589,57 @@ resolution = [{resolution}, {resolution}]
589
  gpu_type=job.gpu_type,
590
  )
591
 
592
- # Execute training and stream output
593
- job._log("Training command submitted...")
594
- transport = ssh.get_transport()
595
- channel = transport.open_session()
596
- channel.exec_command(train_cmd)
597
-
598
- # Read output progressively
599
- buffer = ""
600
- last_flush = time.time()
601
- while not channel.exit_status_ready() or channel.recv_ready():
602
- if channel.recv_ready():
603
- chunk = (await asyncio.to_thread(channel.recv, 4096)).decode("utf-8", errors="replace")
604
- buffer += chunk
605
- # Process complete lines (handle both \n and \r for tqdm progress)
606
- while "\n" in buffer or "\r" in buffer:
607
- # Split on whichever comes first
608
- n_pos = buffer.find("\n")
609
- r_pos = buffer.find("\r")
610
- if n_pos == -1:
611
- split_pos = r_pos
612
- elif r_pos == -1:
613
- split_pos = n_pos
614
- else:
615
- split_pos = min(n_pos, r_pos)
616
- line = buffer[:split_pos].strip()
617
- buffer = buffer[split_pos + 1:]
618
- if not line:
619
- continue
620
- job._log(line)
621
- self._parse_progress(job, line)
622
- self._schedule_db_save(job)
623
- else:
624
- # Periodically flush buffer for partial tqdm lines
625
- if buffer.strip() and time.time() - last_flush > 10:
626
- job._log(buffer.strip())
627
- self._parse_progress(job, buffer.strip())
628
- buffer = ""
629
- last_flush = time.time()
 
 
 
 
 
 
 
 
630
  self._schedule_db_save(job)
631
- await asyncio.sleep(2)
 
632
 
633
- exit_code = channel.recv_exit_status()
634
- if exit_code != 0:
635
- raise RuntimeError(f"Training failed with exit code {exit_code}")
636
 
637
  job._log("Training completed on RunPod!")
638
  job.progress = 0.9
 
589
  gpu_type=job.gpu_type,
590
  )
591
 
592
+ # Execute training in a detached process (survives SSH disconnect)
593
+ job._log("Starting training (detached — survives disconnects)...")
594
+ log_file = "/tmp/training.log"
595
+ pid_file = "/tmp/training.pid"
596
+ exit_file = "/tmp/training.exit"
597
+ await self._ssh_exec(ssh, f"rm -f {log_file} {exit_file}")
598
+ # Run in background with nohup, redirect output to log, save PID and exit code
599
+ detached_cmd = (
600
+ f"nohup bash -c '{train_cmd} > {log_file} 2>&1; echo $? > {exit_file}' &\n"
601
+ f"echo $! > {pid_file}"
602
+ )
603
+ await self._ssh_exec(ssh, detached_cmd, timeout=10)
604
+ await asyncio.sleep(2)
605
+ pid = (await self._ssh_exec(ssh, f"cat {pid_file} 2>/dev/null")).strip()
606
+ job._log(f"Training PID: {pid}")
607
+
608
+ # Monitor the log file (reconnect-safe)
609
+ last_offset = 0
610
+ while True:
611
+ # Check if training finished
612
+ exit_check = (await self._ssh_exec(ssh, f"cat {exit_file} 2>/dev/null")).strip()
613
+ if exit_check:
614
+ exit_code = int(exit_check)
615
+ # Read remaining log
616
+ remaining = (await self._ssh_exec(ssh, f"tail -c +{last_offset + 1} {log_file} 2>/dev/null", timeout=30))
617
+ if remaining:
618
+ for line in remaining.split("\n"):
619
+ line = line.strip()
620
+ if line:
621
+ job._log(line)
622
+ self._parse_progress(job, line)
623
+ if exit_code != 0:
624
+ raise RuntimeError(f"Training failed with exit code {exit_code}")
625
+ break
626
+
627
+ # Read new log output
628
+ try:
629
+ new_output = (await self._ssh_exec(ssh, f"tail -c +{last_offset + 1} {log_file} 2>/dev/null", timeout=30))
630
+ if new_output:
631
+ last_offset += len(new_output.encode("utf-8"))
632
+ for line in new_output.replace("\r", "\n").split("\n"):
633
+ line = line.strip()
634
+ if not line:
635
+ continue
636
+ job._log(line)
637
+ self._parse_progress(job, line)
638
  self._schedule_db_save(job)
639
+ except Exception:
640
+ job._log("Log read failed, retrying...")
641
 
642
+ await asyncio.sleep(5)
 
 
643
 
644
  job._log("Training completed on RunPod!")
645
  job.progress = 0.9