dippoo Claude Opus 4.6 commited on
Commit
3aa914c
·
1 Parent(s): b341f22

Fix event loop blocking: make all SSH/SFTP calls async

Browse files

All _ssh_exec calls now use asyncio.to_thread to avoid blocking
the web server during training setup. SFTP put/get also wrapped.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

src/content_engine/services/runpod_trainer.py CHANGED
@@ -304,7 +304,7 @@ class RunPodTrainer:
304
 
305
  # If using network volume, symlink to /workspace so all paths work
306
  if NETWORK_VOLUME_ID:
307
- self._ssh_exec(ssh, "mkdir -p /runpod-volume/models && rm -rf /workspace/models 2>/dev/null; ln -sf /runpod-volume/models /workspace/models")
308
  job._log("Network volume symlinked to /workspace")
309
 
310
  # Enable keepalive to prevent SSH timeout during uploads
@@ -324,7 +324,7 @@ class RunPodTrainer:
324
  tmp_dir = Path(tempfile.mkdtemp(prefix="lora_upload_"))
325
 
326
  folder_name = f"10_{trigger_word or 'character'}"
327
- self._ssh_exec(ssh, f"mkdir -p /workspace/dataset/{folder_name}")
328
  for i, img_path in enumerate(image_paths):
329
  p = Path(img_path)
330
  if p.exists():
@@ -342,7 +342,7 @@ class RunPodTrainer:
342
  remote_path = f"/workspace/dataset/{folder_name}/{remote_name}"
343
  for attempt in range(3):
344
  try:
345
- sftp.put(str(upload_path), remote_path)
346
  break
347
  except (EOFError, OSError):
348
  if attempt == 2:
@@ -355,12 +355,14 @@ class RunPodTrainer:
355
  local_caption = p.with_suffix(".txt")
356
  if local_caption.exists():
357
  remote_caption = f"/workspace/dataset/{folder_name}/{p.stem}.txt"
358
- sftp.put(str(local_caption), remote_caption)
359
  else:
360
  # Fallback: create caption from trigger word
361
  remote_caption = f"/workspace/dataset/{folder_name}/{p.stem}.txt"
362
- with sftp.open(remote_caption, "w") as f:
363
- f.write(trigger_word or "")
 
 
364
  job._log(f"Uploaded {i+1}/{len(image_paths)}: {p.name}")
365
 
366
  # Cleanup temp compressed images
@@ -381,18 +383,18 @@ class RunPodTrainer:
381
  install_cmds = []
382
 
383
  # Check if already present in workspace
384
- tuner_exist = self._ssh_exec(ssh, f"test -f {tuner_dir}/pyproject.toml && echo EXISTS || echo MISSING").strip()
385
  if tuner_exist == "EXISTS":
386
  job._log("musubi-tuner found in workspace")
387
  else:
388
  # Check volume cache
389
- vol_exist = self._ssh_exec(ssh, "test -f /runpod-volume/musubi-tuner/pyproject.toml && echo EXISTS || echo MISSING").strip()
390
  if vol_exist == "EXISTS":
391
  job._log("Restoring musubi-tuner from volume cache...")
392
- self._ssh_exec(ssh, f"rm -rf {tuner_dir} 2>/dev/null; cp -r /runpod-volume/musubi-tuner {tuner_dir}")
393
  else:
394
  job._log("Cloning musubi-tuner from GitHub...")
395
- self._ssh_exec(ssh, f"rm -rf {tuner_dir} /runpod-volume/musubi-tuner 2>/dev/null; true")
396
  install_cmds.append(f"cd /workspace && git clone --depth 1 https://github.com/kohya-ss/musubi-tuner.git")
397
  # Save to volume for future pods
398
  if NETWORK_VOLUME_ID:
@@ -406,7 +408,7 @@ class RunPodTrainer:
406
  ])
407
  else:
408
  # SD 1.5 / SDXL / FLUX.1 use sd-scripts
409
- scripts_exist = self._ssh_exec(ssh, "test -f /workspace/sd-scripts/setup.py && echo EXISTS || echo MISSING").strip()
410
  if scripts_exist == "EXISTS":
411
  job._log("Kohya sd-scripts already cached on volume, updating...")
412
  install_cmds = [
@@ -423,7 +425,7 @@ class RunPodTrainer:
423
  "pip install accelerate lion-pytorch prodigyopt safetensors bitsandbytes xformers 2>&1 | tail -1",
424
  ])
425
  for cmd in install_cmds:
426
- out = self._ssh_exec(ssh, cmd, timeout=600)
427
  job._log(out[:200] if out else "done")
428
 
429
  # Download base model from HuggingFace (skip if already on network volume)
@@ -432,7 +434,7 @@ class RunPodTrainer:
432
  model_name = model_cfg.get("name", job.base_model)
433
 
434
  job.progress = 0.1
435
- self._ssh_exec(ssh, """pip install huggingface_hub 2>&1 | tail -1""", timeout=120)
436
 
437
  if model_type == "flux2":
438
  # FLUX.2 models are stored in a directory structure on the volume
@@ -441,9 +443,9 @@ class RunPodTrainer:
441
  vae_path = f"{flux2_dir}/ae.safetensors" # Original BFL format (not diffusers)
442
  te_path = f"{flux2_dir}/text_encoder/model-00001-of-00010.safetensors"
443
 
444
- dit_exists = self._ssh_exec(ssh, f"test -f {dit_path} && echo EXISTS || echo MISSING").strip()
445
- vae_exists = self._ssh_exec(ssh, f"test -f {vae_path} && echo EXISTS || echo MISSING").strip()
446
- te_exists = self._ssh_exec(ssh, f"test -f {te_path} && echo EXISTS || echo MISSING").strip()
447
 
448
  if dit_exists != "EXISTS" or te_exists != "EXISTS":
449
  missing = []
@@ -456,14 +458,14 @@ class RunPodTrainer:
456
  # Download ae.safetensors (original format VAE) if not present
457
  if vae_exists != "EXISTS":
458
  job._log("Downloading FLUX.2 VAE (ae.safetensors, 336MB)...")
459
- self._ssh_exec(ssh, """pip install huggingface_hub 2>&1 | tail -1""", timeout=120)
460
- self._ssh_exec(ssh, f"""python -c "
461
  from huggingface_hub import hf_hub_download
462
  hf_hub_download('black-forest-labs/FLUX.2-dev', 'ae.safetensors', local_dir='{flux2_dir}')
463
  print('Downloaded ae.safetensors')
464
  " 2>&1 | tail -5""", timeout=600)
465
  # Verify download
466
- vae_check = self._ssh_exec(ssh, f"test -f {vae_path} && echo EXISTS || echo MISSING").strip()
467
  if vae_check != "EXISTS":
468
  raise RuntimeError("Failed to download ae.safetensors")
469
  job._log("VAE downloaded")
@@ -472,12 +474,12 @@ print('Downloaded ae.safetensors')
472
 
473
  else:
474
  # SD 1.5 / SDXL / FLUX.1 — download single model file
475
- model_exists = self._ssh_exec(ssh, f"test -f /workspace/models/{hf_filename} && echo EXISTS || echo MISSING").strip()
476
  if model_exists == "EXISTS":
477
  job._log(f"Base model already cached on volume: {model_name}")
478
  else:
479
  job._log(f"Downloading base model: {model_name}...")
480
- self._ssh_exec(ssh, f"""
481
  python -c "
482
  from huggingface_hub import hf_hub_download
483
  hf_hub_download('{hf_repo}', '{hf_filename}', local_dir='/workspace/models')
@@ -486,13 +488,13 @@ hf_hub_download('{hf_repo}', '{hf_filename}', local_dir='/workspace/models')
486
 
487
  # For FLUX.1, download additional required models (CLIP, T5, VAE)
488
  if model_type == "flux":
489
- flux_files_check = self._ssh_exec(ssh, "test -f /workspace/models/clip_l.safetensors && test -f /workspace/models/t5xxl_fp16.safetensors && test -f /workspace/models/ae.safetensors && echo EXISTS || echo MISSING").strip()
490
  if flux_files_check == "EXISTS":
491
  job._log("FLUX.1 auxiliary models already cached on volume")
492
  else:
493
  job._log("Downloading FLUX.1 auxiliary models (CLIP, T5, VAE)...")
494
  job.progress = 0.12
495
- self._ssh_exec(ssh, """
496
  python -c "
497
  from huggingface_hub import hf_hub_download
498
  hf_hub_download('comfyanonymous/flux_text_encoders', 'clip_l.safetensors', local_dir='/workspace/models')
@@ -523,7 +525,7 @@ batch_size = 1
523
  num_repeats = 10
524
  resolution = [{resolution}, {resolution}]
525
  """
526
- self._ssh_exec(ssh, f"cat > /workspace/dataset.toml << 'TOMLEOF'\n{toml_content}TOMLEOF")
527
  job._log("Created dataset.toml config")
528
 
529
  # musubi-tuner requires pre-caching latents and text encoder outputs
@@ -542,13 +544,13 @@ resolution = [{resolution}, {resolution}]
542
  f" --vae_dtype bfloat16"
543
  f" 2>&1 | tee /tmp/cache_latents.log; echo EXIT_CODE=${{PIPESTATUS[0]}}"
544
  )
545
- out = self._ssh_exec(ssh, cache_latents_cmd, timeout=600)
546
  # Get last lines which have the real error
547
  last_lines = out.split('\n')[-30:]
548
  job._log('\n'.join(last_lines))
549
  if "EXIT_CODE=0" not in out:
550
  # Fetch the full error log
551
- err_log = self._ssh_exec(ssh, "grep -i 'error\\|exception\\|traceback\\|failed' /tmp/cache_latents.log | tail -10")
552
  job._log(f"Cache error details: {err_log}")
553
  raise RuntimeError(f"Latent caching failed")
554
 
@@ -564,7 +566,7 @@ resolution = [{resolution}, {resolution}]
564
  f" --batch_size 1"
565
  f" 2>&1; echo EXIT_CODE=$?"
566
  )
567
- out = self._ssh_exec(ssh, cache_te_cmd, timeout=600)
568
  job._log(out[-500:] if out else "done")
569
  if "EXIT_CODE=0" not in out:
570
  raise RuntimeError(f"Text encoder caching failed: {out[-200:]}")
@@ -598,7 +600,7 @@ resolution = [{resolution}, {resolution}]
598
  last_flush = time.time()
599
  while not channel.exit_status_ready() or channel.recv_ready():
600
  if channel.recv_ready():
601
- chunk = channel.recv(4096).decode("utf-8", errors="replace")
602
  buffer += chunk
603
  # Process complete lines (handle both \n and \r for tqdm progress)
604
  while "\n" in buffer or "\r" in buffer:
@@ -640,18 +642,18 @@ resolution = [{resolution}, {resolution}]
640
 
641
  # First, copy to network volume for persistence
642
  job._log("Saving LoRA to network volume...")
643
- self._ssh_exec(ssh, "mkdir -p /runpod-volume/loras")
644
  remote_output = f"/workspace/output/{name}.safetensors"
645
  # Find the output file
646
- check = self._ssh_exec(ssh, f"test -f {remote_output} && echo EXISTS || echo MISSING").strip()
647
  if check == "MISSING":
648
- remote_files = self._ssh_exec(ssh, "ls /workspace/output/*.safetensors 2>/dev/null").strip()
649
  if remote_files:
650
  remote_output = remote_files.split("\n")[-1].strip()
651
  else:
652
  raise RuntimeError("No .safetensors output found")
653
 
654
- self._ssh_exec(ssh, f"cp {remote_output} /runpod-volume/loras/{name}.safetensors")
655
  job._log(f"LoRA saved to volume: /runpod-volume/loras/{name}.safetensors")
656
 
657
  # Download locally (skip on HF Spaces — limited storage)
@@ -662,7 +664,7 @@ resolution = [{resolution}, {resolution}]
662
  job._log("Downloading LoRA to local machine...")
663
  LORA_OUTPUT_DIR.mkdir(parents=True, exist_ok=True)
664
  local_path = LORA_OUTPUT_DIR / f"{name}.safetensors"
665
- sftp.get(remote_output, str(local_path))
666
  job.output_path = str(local_path)
667
  job._log(f"LoRA saved locally to {local_path}")
668
 
@@ -982,8 +984,8 @@ resolution = [{resolution}, {resolution}]
982
 
983
  raise RuntimeError(f"Pod did not become ready within {timeout}s")
984
 
985
- def _ssh_exec(self, ssh, cmd: str, timeout: int = 120) -> str:
986
- """Execute a command over SSH and return stdout."""
987
  _, stdout, stderr = ssh.exec_command(cmd, timeout=timeout)
988
  out = stdout.read().decode("utf-8", errors="replace")
989
  err = stderr.read().decode("utf-8", errors="replace")
@@ -992,6 +994,10 @@ resolution = [{resolution}, {resolution}]
992
  logger.warning("SSH cmd failed (code %d): %s\nstderr: %s", exit_code, cmd[:100], err[:500])
993
  return out.strip()
994
 
 
 
 
 
995
  def _parse_progress(self, job: CloudTrainingJob, line: str):
996
  """Parse Kohya training output for progress info."""
997
  lower = line.lower()
 
304
 
305
  # If using network volume, symlink to /workspace so all paths work
306
  if NETWORK_VOLUME_ID:
307
+ await self._ssh_exec(ssh, "mkdir -p /runpod-volume/models && rm -rf /workspace/models 2>/dev/null; ln -sf /runpod-volume/models /workspace/models")
308
  job._log("Network volume symlinked to /workspace")
309
 
310
  # Enable keepalive to prevent SSH timeout during uploads
 
324
  tmp_dir = Path(tempfile.mkdtemp(prefix="lora_upload_"))
325
 
326
  folder_name = f"10_{trigger_word or 'character'}"
327
+ await self._ssh_exec(ssh, f"mkdir -p /workspace/dataset/{folder_name}")
328
  for i, img_path in enumerate(image_paths):
329
  p = Path(img_path)
330
  if p.exists():
 
342
  remote_path = f"/workspace/dataset/{folder_name}/{remote_name}"
343
  for attempt in range(3):
344
  try:
345
+ await asyncio.to_thread(sftp.put, str(upload_path), remote_path)
346
  break
347
  except (EOFError, OSError):
348
  if attempt == 2:
 
355
  local_caption = p.with_suffix(".txt")
356
  if local_caption.exists():
357
  remote_caption = f"/workspace/dataset/{folder_name}/{p.stem}.txt"
358
+ await asyncio.to_thread(sftp.put, str(local_caption), remote_caption)
359
  else:
360
  # Fallback: create caption from trigger word
361
  remote_caption = f"/workspace/dataset/{folder_name}/{p.stem}.txt"
362
+ def _write_caption():
363
+ with sftp.open(remote_caption, "w") as f:
364
+ f.write(trigger_word or "")
365
+ await asyncio.to_thread(_write_caption)
366
  job._log(f"Uploaded {i+1}/{len(image_paths)}: {p.name}")
367
 
368
  # Cleanup temp compressed images
 
383
  install_cmds = []
384
 
385
  # Check if already present in workspace
386
+ tuner_exist = await self._ssh_exec(ssh, f"test -f {tuner_dir}/pyproject.toml && echo EXISTS || echo MISSING").strip()
387
  if tuner_exist == "EXISTS":
388
  job._log("musubi-tuner found in workspace")
389
  else:
390
  # Check volume cache
391
+ vol_exist = await self._ssh_exec(ssh, "test -f /runpod-volume/musubi-tuner/pyproject.toml && echo EXISTS || echo MISSING").strip()
392
  if vol_exist == "EXISTS":
393
  job._log("Restoring musubi-tuner from volume cache...")
394
+ await self._ssh_exec(ssh, f"rm -rf {tuner_dir} 2>/dev/null; cp -r /runpod-volume/musubi-tuner {tuner_dir}")
395
  else:
396
  job._log("Cloning musubi-tuner from GitHub...")
397
+ await self._ssh_exec(ssh, f"rm -rf {tuner_dir} /runpod-volume/musubi-tuner 2>/dev/null; true")
398
  install_cmds.append(f"cd /workspace && git clone --depth 1 https://github.com/kohya-ss/musubi-tuner.git")
399
  # Save to volume for future pods
400
  if NETWORK_VOLUME_ID:
 
408
  ])
409
  else:
410
  # SD 1.5 / SDXL / FLUX.1 use sd-scripts
411
+ scripts_exist = await self._ssh_exec(ssh, "test -f /workspace/sd-scripts/setup.py && echo EXISTS || echo MISSING").strip()
412
  if scripts_exist == "EXISTS":
413
  job._log("Kohya sd-scripts already cached on volume, updating...")
414
  install_cmds = [
 
425
  "pip install accelerate lion-pytorch prodigyopt safetensors bitsandbytes xformers 2>&1 | tail -1",
426
  ])
427
  for cmd in install_cmds:
428
+ out = await self._ssh_exec(ssh, cmd, timeout=600)
429
  job._log(out[:200] if out else "done")
430
 
431
  # Download base model from HuggingFace (skip if already on network volume)
 
434
  model_name = model_cfg.get("name", job.base_model)
435
 
436
  job.progress = 0.1
437
+ await self._ssh_exec(ssh, """pip install huggingface_hub 2>&1 | tail -1""", timeout=120)
438
 
439
  if model_type == "flux2":
440
  # FLUX.2 models are stored in a directory structure on the volume
 
443
  vae_path = f"{flux2_dir}/ae.safetensors" # Original BFL format (not diffusers)
444
  te_path = f"{flux2_dir}/text_encoder/model-00001-of-00010.safetensors"
445
 
446
+ dit_exists = await self._ssh_exec(ssh, f"test -f {dit_path} && echo EXISTS || echo MISSING").strip()
447
+ vae_exists = await self._ssh_exec(ssh, f"test -f {vae_path} && echo EXISTS || echo MISSING").strip()
448
+ te_exists = await self._ssh_exec(ssh, f"test -f {te_path} && echo EXISTS || echo MISSING").strip()
449
 
450
  if dit_exists != "EXISTS" or te_exists != "EXISTS":
451
  missing = []
 
458
  # Download ae.safetensors (original format VAE) if not present
459
  if vae_exists != "EXISTS":
460
  job._log("Downloading FLUX.2 VAE (ae.safetensors, 336MB)...")
461
+ await self._ssh_exec(ssh, """pip install huggingface_hub 2>&1 | tail -1""", timeout=120)
462
+ await self._ssh_exec(ssh, f"""python -c "
463
  from huggingface_hub import hf_hub_download
464
  hf_hub_download('black-forest-labs/FLUX.2-dev', 'ae.safetensors', local_dir='{flux2_dir}')
465
  print('Downloaded ae.safetensors')
466
  " 2>&1 | tail -5""", timeout=600)
467
  # Verify download
468
+ vae_check = await self._ssh_exec(ssh, f"test -f {vae_path} && echo EXISTS || echo MISSING").strip()
469
  if vae_check != "EXISTS":
470
  raise RuntimeError("Failed to download ae.safetensors")
471
  job._log("VAE downloaded")
 
474
 
475
  else:
476
  # SD 1.5 / SDXL / FLUX.1 — download single model file
477
+ model_exists = await self._ssh_exec(ssh, f"test -f /workspace/models/{hf_filename} && echo EXISTS || echo MISSING").strip()
478
  if model_exists == "EXISTS":
479
  job._log(f"Base model already cached on volume: {model_name}")
480
  else:
481
  job._log(f"Downloading base model: {model_name}...")
482
+ await self._ssh_exec(ssh, f"""
483
  python -c "
484
  from huggingface_hub import hf_hub_download
485
  hf_hub_download('{hf_repo}', '{hf_filename}', local_dir='/workspace/models')
 
488
 
489
  # For FLUX.1, download additional required models (CLIP, T5, VAE)
490
  if model_type == "flux":
491
+ flux_files_check = await self._ssh_exec(ssh, "test -f /workspace/models/clip_l.safetensors && test -f /workspace/models/t5xxl_fp16.safetensors && test -f /workspace/models/ae.safetensors && echo EXISTS || echo MISSING").strip()
492
  if flux_files_check == "EXISTS":
493
  job._log("FLUX.1 auxiliary models already cached on volume")
494
  else:
495
  job._log("Downloading FLUX.1 auxiliary models (CLIP, T5, VAE)...")
496
  job.progress = 0.12
497
+ await self._ssh_exec(ssh, """
498
  python -c "
499
  from huggingface_hub import hf_hub_download
500
  hf_hub_download('comfyanonymous/flux_text_encoders', 'clip_l.safetensors', local_dir='/workspace/models')
 
525
  num_repeats = 10
526
  resolution = [{resolution}, {resolution}]
527
  """
528
+ await self._ssh_exec(ssh, f"cat > /workspace/dataset.toml << 'TOMLEOF'\n{toml_content}TOMLEOF")
529
  job._log("Created dataset.toml config")
530
 
531
  # musubi-tuner requires pre-caching latents and text encoder outputs
 
544
  f" --vae_dtype bfloat16"
545
  f" 2>&1 | tee /tmp/cache_latents.log; echo EXIT_CODE=${{PIPESTATUS[0]}}"
546
  )
547
+ out = await self._ssh_exec(ssh, cache_latents_cmd, timeout=600)
548
  # Get last lines which have the real error
549
  last_lines = out.split('\n')[-30:]
550
  job._log('\n'.join(last_lines))
551
  if "EXIT_CODE=0" not in out:
552
  # Fetch the full error log
553
+ err_log = await self._ssh_exec(ssh, "grep -i 'error\\|exception\\|traceback\\|failed' /tmp/cache_latents.log | tail -10")
554
  job._log(f"Cache error details: {err_log}")
555
  raise RuntimeError(f"Latent caching failed")
556
 
 
566
  f" --batch_size 1"
567
  f" 2>&1; echo EXIT_CODE=$?"
568
  )
569
+ out = await self._ssh_exec(ssh, cache_te_cmd, timeout=600)
570
  job._log(out[-500:] if out else "done")
571
  if "EXIT_CODE=0" not in out:
572
  raise RuntimeError(f"Text encoder caching failed: {out[-200:]}")
 
600
  last_flush = time.time()
601
  while not channel.exit_status_ready() or channel.recv_ready():
602
  if channel.recv_ready():
603
+ chunk = (await asyncio.to_thread(channel.recv, 4096)).decode("utf-8", errors="replace")
604
  buffer += chunk
605
  # Process complete lines (handle both \n and \r for tqdm progress)
606
  while "\n" in buffer or "\r" in buffer:
 
642
 
643
  # First, copy to network volume for persistence
644
  job._log("Saving LoRA to network volume...")
645
+ await self._ssh_exec(ssh, "mkdir -p /runpod-volume/loras")
646
  remote_output = f"/workspace/output/{name}.safetensors"
647
  # Find the output file
648
+ check = await self._ssh_exec(ssh, f"test -f {remote_output} && echo EXISTS || echo MISSING").strip()
649
  if check == "MISSING":
650
+ remote_files = await self._ssh_exec(ssh, "ls /workspace/output/*.safetensors 2>/dev/null").strip()
651
  if remote_files:
652
  remote_output = remote_files.split("\n")[-1].strip()
653
  else:
654
  raise RuntimeError("No .safetensors output found")
655
 
656
+ await self._ssh_exec(ssh, f"cp {remote_output} /runpod-volume/loras/{name}.safetensors")
657
  job._log(f"LoRA saved to volume: /runpod-volume/loras/{name}.safetensors")
658
 
659
  # Download locally (skip on HF Spaces — limited storage)
 
664
  job._log("Downloading LoRA to local machine...")
665
  LORA_OUTPUT_DIR.mkdir(parents=True, exist_ok=True)
666
  local_path = LORA_OUTPUT_DIR / f"{name}.safetensors"
667
+ await asyncio.to_thread(sftp.get, remote_output, str(local_path))
668
  job.output_path = str(local_path)
669
  job._log(f"LoRA saved locally to {local_path}")
670
 
 
984
 
985
  raise RuntimeError(f"Pod did not become ready within {timeout}s")
986
 
987
+ def _ssh_exec_sync(self, ssh, cmd: str, timeout: int = 120) -> str:
988
+ """Execute a command over SSH and return stdout (blocking)."""
989
  _, stdout, stderr = ssh.exec_command(cmd, timeout=timeout)
990
  out = stdout.read().decode("utf-8", errors="replace")
991
  err = stderr.read().decode("utf-8", errors="replace")
 
994
  logger.warning("SSH cmd failed (code %d): %s\nstderr: %s", exit_code, cmd[:100], err[:500])
995
  return out.strip()
996
 
997
+ async def _ssh_exec(self, ssh, cmd: str, timeout: int = 120) -> str:
998
+ """Execute a command over SSH without blocking the event loop."""
999
+ return await asyncio.to_thread(self._ssh_exec_sync, ssh, cmd, timeout)
1000
+
1001
  def _parse_progress(self, job: CloudTrainingJob, line: str):
1002
  """Parse Kohya training output for progress info."""
1003
  lower = line.lower()