File size: 50,024 Bytes
7344bef | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 678 679 680 681 682 | from __future__ import annotations
import os
import time
from dataclasses import dataclass, field
from pathlib import Path
import gradio as gr
from shared.utils.hdr import VIDEO_PROMPT_HDR_OUTPUT_FLAG
from .chunk_executor import ChunkExecutor, ChunkProgress, ProcessContext
from . import common
from . import continuation_recovery
from . import frame_planning as frames
from . import media_io as media
from . import output_paths
from . import process_catalog as catalog
from . import process_metadata
from . import prompt_schedule as prompts
from . import status_ui
from . import video_buffers as video
from .mux_session import MuxSession
from .run_preparation import ProcessInfoExit, prepare_run
@dataclass(frozen=True)
class RunRequest:
state: dict | None = None
process_name: str = ""
user_refs: list[str] = field(default_factory=list)
source_path: str = ""
process_strength: object = None
output_path: str = ""
prompt_text: str = ""
continue_enabled: bool = True
source_audio_track: str = ""
output_resolution: str = "720p"
target_ratio: str = ""
chunk_size_seconds: object = 10.0
sliding_window_overlap: object = 1
start_seconds: str = ""
end_seconds: str = ""
@classmethod
def from_gradio(
cls,
state=None,
process_name="",
user_refs=None,
source_path="",
process_strength=None,
output_path="",
prompt_text="",
continue_enabled=True,
source_audio_track="",
output_resolution="720p",
target_ratio="",
chunk_size_seconds=10.0,
sliding_window_overlap=1,
start_seconds="",
end_seconds="",
) -> "RunRequest":
return cls(
state=state,
process_name=str(process_name or "").strip(),
user_refs=list(user_refs or []),
source_path=str(source_path or "").strip(),
process_strength=process_strength,
output_path=str(output_path or "").strip(),
prompt_text=str(prompt_text or ""),
continue_enabled=bool(continue_enabled),
source_audio_track=str(source_audio_track or "").strip(),
output_resolution=str(output_resolution or "720p").strip(),
target_ratio=str(target_ratio or "").strip(),
chunk_size_seconds=chunk_size_seconds,
sliding_window_overlap=sliding_window_overlap,
start_seconds="" if start_seconds in (None, "") else str(start_seconds),
end_seconds="" if end_seconds in (None, "") else str(end_seconds),
)
class ProcessRunner:
def __init__(self, *, plugin, api_session, library, get_model_def, active_job: dict, preview_state: dict, ui_skip, ui_update, info_exit, reset_live_chunk_status) -> None:
self.plugin = plugin
self.api_session = api_session
self.library = library
self.get_model_def = get_model_def
self.active_job = active_job
self.preview_state = preview_state
self.ui_skip = ui_skip
self.ui_update = ui_update
self.info_exit = info_exit
self.reset_live_chunk_status = reset_live_chunk_status
def start_process(self, state=None, process_name="", user_refs=None, source_path="", process_strength=None, output_path="", prompt_text="", continue_enabled=True, source_audio_track="", output_resolution="720p", target_ratio="", chunk_size_seconds=10.0, sliding_window_overlap=1, start_seconds="", end_seconds=""):
if self.active_job.get("running"):
yield self.info_exit("A process is already running.")
return
request = RunRequest.from_gradio(state, process_name, user_refs, source_path, process_strength, output_path, prompt_text, continue_enabled, source_audio_track, output_resolution, target_ratio, chunk_size_seconds, sliding_window_overlap, start_seconds, end_seconds)
process_definition = self.library.process_definition(request.process_name, request.state, request.user_refs)
if process_definition is None:
yield self.info_exit(f"Unsupported process: {request.process_name}")
return
system_handler = self.library.system_handler_for_definition(process_definition)
if process_definition.get("source") == "user":
problems = self.library.validate_user_process_definition(process_definition)
if len(problems) > 0:
yield self.info_exit(self.library.format_user_process_validation_error(process_definition, problems))
return
process_settings = process_definition["settings"]
model_type = str(process_settings.get("model_type") or "")
if len(model_type) == 0 and system_handler is None:
yield self.info_exit(f"Unsupported process: {request.process_name}")
return
process_display_name = str(process_definition.get("name") or request.process_name or "").strip()
process_is_hdr = False if system_handler is not None else VIDEO_PROMPT_HDR_OUTPUT_FLAG in str(process_settings.get("video_prompt_type") or "")
is_user_process = process_definition.get("source") == "user"
has_outpaint_setting = "video_guide_outpainting" in process_settings
uses_builtin_outpaint_ui = self.library.uses_builtin_outpaint_ui(process_definition)
user_lora_strength_override_default = self.library.user_lora_strength_override_default(process_definition)
use_lora_strength_override = system_handler is None and not uses_builtin_outpaint_ui and (not is_user_process or user_lora_strength_override_default is not None)
process_strength_default = user_lora_strength_override_default if user_lora_strength_override_default is not None else common.get_default_process_strength(process_settings)
active_process_strength = 1.0 if uses_builtin_outpaint_ui else (common.coerce_float(request.process_strength, process_strength_default) if use_lora_strength_override else process_strength_default)
source_path = request.source_path
output_path = request.output_path
source_audio_track = request.source_audio_track
output_resolution = request.output_resolution
target_ratio = request.target_ratio
prompt_text = request.prompt_text
start_seconds = request.start_seconds
end_seconds = request.end_seconds
if system_handler is not None and callable(getattr(system_handler, "normalize_target_control_for_process", None)):
system_target_control = system_handler.normalize_target_control_for_process(request.target_ratio, process_settings)
else:
system_target_control = system_handler.normalize_target_control(request.target_ratio) if system_handler is not None and hasattr(system_handler, "normalize_target_control") else ""
system_supports_continue_cache = system_handler is not None and (not callable(getattr(system_handler, "supports_continue_cache_for_target", None)) or system_handler.supports_continue_cache_for_target(system_target_control))
try:
chunk_size_seconds = common.require_float(request.chunk_size_seconds, "Chunk Size", minimum=0.1)
sliding_window_overlap = int(getattr(system_handler, "overlap_frames", 0)) if system_handler is not None else common.require_int(request.sliding_window_overlap, "Sliding Window Overlap", minimum=1)
except gr.Error as exc:
yield self.info_exit(common.get_error_message(exc) or "Invalid processing settings.")
return
try:
catalog.save_process_full_video_ui_settings({
"process_model_type": model_type,
"process_name": request.process_name,
"source_path": source_path,
"process_strength": active_process_strength,
"output_path": output_path,
"prompt": prompt_text,
"continue_enabled": request.continue_enabled,
"source_audio_track": source_audio_track,
"output_resolution": output_resolution,
"target_ratio": system_target_control if system_handler is not None else target_ratio,
"chunk_size_seconds": chunk_size_seconds,
"sliding_window_overlap": sliding_window_overlap,
"start_seconds": start_seconds,
"end_seconds": end_seconds,
})
except OSError as exc:
yield self.info_exit(f"Unable to save plugin settings to {catalog.PROCESS_FULL_VIDEO_SETTINGS_FILE}: {exc}")
return
active_target_ratio = target_ratio if uses_builtin_outpaint_ui else ""
default_prompt_text = str(process_settings.get("prompt") or "")
if len(prompt_text.strip()) == 0:
prompt_text = default_prompt_text
if not os.path.isfile(source_path):
yield self.info_exit(f"Source video not found: {source_path}")
return
try:
start_seconds = prompts.parse_time_input(start_seconds, label="Start", allow_empty=False)
end_seconds = prompts.parse_time_input(end_seconds, label="End", allow_empty=True)
except gr.Error as exc:
yield self.info_exit(common.get_error_message(exc) or "Invalid start/end selection.")
return
try:
prompt_schedule = prompts.parse_prompt_schedule(prompt_text)
except gr.Error as exc:
yield self.info_exit(common.get_error_message(exc) or f"Invalid prompt syntax.\n\nExample:\n{prompts.TIMED_PROMPT_EXAMPLE}")
return
started_ui = False
preflight_stage = True
write_state = MuxSession()
total_chunks_display = 1
completed_chunks = 0
resumed_unique_frames = 0
self.active_job["cancel_requested"] = False
self.active_job["write_state"] = write_state
self.active_job["running"] = True
try:
video.clear_process_full_video_source()
yield self.ui_update(status_ui.render_chunk_status_html(1, 0, 1, "Initializing", "Preparing processing job..."), self.ui_skip, str(time.time_ns()), start_enabled=False, abort_enabled=False)
started_ui = True
try:
prepared_run = prepare_run(
plugin=self.plugin,
get_model_def=self.get_model_def,
process_name=request.process_name,
process_display_name=process_display_name,
source_path=source_path,
output_path=output_path,
output_resolution=output_resolution,
active_target_ratio=active_target_ratio,
continue_enabled=request.continue_enabled,
source_audio_track=source_audio_track,
chunk_size_seconds=chunk_size_seconds,
sliding_window_overlap=sliding_window_overlap,
start_seconds=start_seconds,
end_seconds=end_seconds,
model_type=model_type,
process_is_hdr=process_is_hdr,
uses_builtin_outpaint_ui=uses_builtin_outpaint_ui,
system_handler=system_handler,
system_target_control=system_target_control,
)
verbose_level = prepared_run.verbose_level
start_frame = prepared_run.start_frame
end_frame_exclusive = prepared_run.end_frame_exclusive
fps_float = prepared_run.fps_float
total_source_frames = prepared_run.total_source_frames
processing_fps = prepared_run.processing_fps
selected_audio_track = prepared_run.selected_audio_track
frame_plan_rules = prepared_run.frame_plan_rules
budget_resolution = prepared_run.budget_resolution
chunk_frames = prepared_run.chunk_frames
overlap_frames = prepared_run.overlap_frames
full_plans = prepared_run.full_plans
requested_unique_frames = prepared_run.requested_unique_frames
requested_source_segment = prepared_run.requested_source_segment
output_path = prepared_run.output_path
resume_existing_output = prepared_run.resume_existing_output
ffmpeg_path = prepared_run.ffmpeg_path
ffprobe_path = prepared_run.ffprobe_path
output_container = prepared_run.output_container
merged_continuation_signatures = prepared_run.merged_continuation_signatures
except ProcessInfoExit as exc:
yield self.info_exit(exc.message, output=exc.output_path or self.ui_skip)
return
except gr.Error as exc:
yield self.info_exit(common.get_error_message(exc) or "Invalid process settings.", output=output_path if isinstance(output_path, str) and os.path.isfile(output_path) else self.ui_skip)
return
preflight_stage = False
if resume_existing_output:
recovery_result = yield from continuation_recovery.recover_residual_continuations(
plugin=self.plugin,
ffmpeg_path=ffmpeg_path,
ffprobe_path=ffprobe_path,
output_path=output_path,
full_plans=full_plans,
output_container=output_container,
fps_float=fps_float,
selected_audio_track=selected_audio_track,
source_path=source_path,
start_frame=start_frame,
merged_signatures=merged_continuation_signatures,
verbose_level=verbose_level,
ui_update=self.ui_update,
ui_skip=self.ui_skip,
system_handler=system_handler,
system_supports_continue_cache=system_supports_continue_cache,
)
merged_continuation_signatures = recovery_result.merged_signatures
if recovery_result.blocked:
return
if self.active_job.get("cancel_requested"):
write_state.stopped = True
yield self.ui_update(status_ui.render_chunk_status_html(len(full_plans) if len(full_plans) > 0 else 1, 0, 1, "Stopped", "Stopped before processing a new chunk."), output_path if os.path.isfile(output_path) else self.ui_skip, self.ui_skip, start_enabled=True, abort_enabled=False)
return
last_frame_image = None
last_segment_path = None
continuation_output_path = ""
chunk_output_paths: list[str] = []
written_unique_frames = 0
resumed_unique_frames = 0
resume_overlap_frames = 0
completed_chunks = 0
resolved_resolution = ""
resolved_width = 0
resolved_height = 0
merged_continuation = False
resume_audio_trim_seconds = 0.0
continue_cache = None
self.preview_state["image"] = None
write_state.set_output_path(output_path)
exact_start_seconds = start_frame / fps_float
if resume_existing_output:
yield self.ui_update(status_ui.render_chunk_status_html(len(full_plans), 0, 1, "Inspecting Existing Output", f"Inspecting existing output to continue: {output_path}"), output_path, str(time.time_ns()))
process_metadata.log_existing_output_metadata(output_path, verbose_level)
resumed_unique_frames, resume_reason = media.probe_resume_frame_count(ffprobe_path, output_path, fps_float)
recorded_written_unique_frames = process_metadata.read_recorded_written_unique_frames(output_path)
if 0 < recorded_written_unique_frames < resumed_unique_frames:
common.plugin_info(f"Output contains {resumed_unique_frames} readable frame(s), but metadata recorded {recorded_written_unique_frames}. Using the real frame count for continuation.")
process_metadata.store_process_progress(output_path, written_unique_frames=resumed_unique_frames, merged_signatures=merged_continuation_signatures, verbose_level=verbose_level)
elif recorded_written_unique_frames > resumed_unique_frames:
common.plugin_info(f"Ignoring recorded output progress of {recorded_written_unique_frames} frame(s) because the output only probes as {resumed_unique_frames} frame(s).")
if resumed_unique_frames < 0:
common.plugin_info(f"Ignoring negative probed output progress from {output_path}.")
resumed_unique_frames = 0
elif resumed_unique_frames > requested_unique_frames:
common.plugin_info(f"Existing output already covers the requested {requested_unique_frames} frame(s).")
resumed_unique_frames = requested_unique_frames
if resumed_unique_frames <= 0:
common.plugin_info(f"Unable to continue from existing output: {output_path}. {resume_reason or 'Starting a new file instead.'}")
output_path = output_paths.make_output_variant(Path(output_path), notify=common.plugin_info)
write_state.set_output_path(output_path)
resumed_unique_frames = 0
resume_overlap_frames = 0
completed_chunks = 0
exact_start_seconds = start_frame / fps_float
resume_existing_output = False
else:
common.plugin_info(f"Continuing existing output: {output_path}")
resolved_resolution, resolved_width, resolved_height = video.probe_existing_output_resolution(output_path)
print(f"[Process Full Video] Continuing with locked output resolution {resolved_resolution}")
completed_chunks, _ = (frames.count_completed_written_chunks if system_handler is not None else frames.count_completed_chunks)(full_plans, resumed_unique_frames)
exact_start_seconds = (start_frame + resumed_unique_frames) / fps_float
if resumed_unique_frames < requested_unique_frames:
resume_phase = "Planning Source Overlap" if system_handler is not None else "Loading Overlap Frames"
yield self.ui_update(status_ui.render_chunk_status_html(len(full_plans), 0, 1, resume_phase, f"Continuing existing output with {resumed_unique_frames} frame(s) already written."), output_path, str(time.time_ns()))
checked_unique_frames, last_frame_image, tail_reason = video.resolve_resume_last_frame(output_path, resumed_unique_frames)
if system_handler is not None:
if checked_unique_frames > 0:
resumed_unique_frames = checked_unique_frames
if tail_reason:
common.plugin_info(tail_reason)
if last_frame_image is not None:
self.preview_state["image"] = last_frame_image
if system_supports_continue_cache and hasattr(system_handler, "load_continue_cache"):
sidecar_exists = callable(getattr(system_handler, "cache_sidecar_path", None)) and Path(system_handler.cache_sidecar_path(output_path)).is_file()
if sidecar_exists or not callable(getattr(system_handler, "continue_cache_from_tail_frames", None)):
continue_cache = system_handler.load_continue_cache(output_path)
else:
fallback_frames = min(int(getattr(system_handler, "overlap_frames", 0)), int(resumed_unique_frames))
fallback_tail = video.load_process_full_video_overlap_buffer(output_path, fallback_frames, resumed_unique_frames)
continue_cache = system_handler.continue_cache_from_tail_frames(fallback_tail, system_target_control)
if continue_cache is None:
continue_cache = system_handler.load_continue_cache(output_path)
common.plugin_info("FlashVSR continuation sidecar is missing; continuing from decoded output tail frames. The first resumed chunk may have lower temporal continuity.")
completed_chunks, _ = frames.count_completed_written_chunks(full_plans, resumed_unique_frames)
exact_start_seconds = (start_frame + resumed_unique_frames) / fps_float
resume_overlap_frames = 0
if resumed_unique_frames < requested_unique_frames:
print(f"[Process Full Video] Continuing system process from source frame {start_frame + resumed_unique_frames}" + (" using continue cache" if system_supports_continue_cache else ""))
elif checked_unique_frames <= 0 or last_frame_image is None:
common.plugin_info(f"Unable to continue from existing output: {output_path}. {tail_reason or 'Starting a new file instead.'}")
output_path = output_paths.make_output_variant(Path(output_path), notify=common.plugin_info)
write_state.set_output_path(output_path)
resumed_unique_frames = 0
resume_overlap_frames = 0
completed_chunks = 0
self.preview_state["image"] = None
exact_start_seconds = start_frame / fps_float
resume_existing_output = False
else:
resumed_unique_frames = checked_unique_frames
if tail_reason:
common.plugin_info(tail_reason)
self.preview_state["image"] = last_frame_image
completed_chunks, _ = frames.count_completed_chunks(full_plans, resumed_unique_frames)
exact_start_seconds = (start_frame + resumed_unique_frames) / fps_float
resume_overlap_frames = overlap_frames
if resumed_unique_frames < resume_overlap_frames:
resume_overlap_frames = resumed_unique_frames
overlap_tensor = video.load_process_full_video_hdr_overlap_buffer(output_path, resume_overlap_frames, resumed_unique_frames) if process_is_hdr else video.load_process_full_video_overlap_buffer(output_path, resume_overlap_frames, resumed_unique_frames)
if overlap_tensor is None:
common.plugin_info(f"Unable to continue from existing output: {output_path}. Failed to load the overlap frames from the recorded output.")
output_path = output_paths.make_output_variant(Path(output_path), notify=common.plugin_info)
write_state.set_output_path(output_path)
resumed_unique_frames = 0
resume_overlap_frames = 0
completed_chunks = 0
self.preview_state["image"] = None
exact_start_seconds = start_frame / fps_float
resume_existing_output = False
else:
resume_overlap_frames = int(overlap_tensor.shape[1])
video.set_process_full_video_overlap_buffer(overlap_tensor, processing_fps, hdr=process_is_hdr)
print(f"[Process Full Video] Loaded overlap buffer from existing output: {frames.describe_frame_range(start_frame + resumed_unique_frames - resume_overlap_frames, resume_overlap_frames)}")
if resume_existing_output and resumed_unique_frames < requested_unique_frames:
if not continuation_recovery.USE_SOURCE_AUDIO_FOR_CONTINUATION_MERGE:
resume_audio_trim_seconds = media.probe_selected_audio_overhang(ffprobe_path, output_path, selected_audio_track, resumed_unique_frames / fps_float)
if continuation_recovery.USE_SOURCE_AUDIO_FOR_CONTINUATION_MERGE and selected_audio_track is not None:
print(f"[Process Full Video] Final merge: rebuilding audio from source starting at {start_frame / fps_float:.6f}s")
elif resume_audio_trim_seconds > 0.0:
print(f"[Process Full Video] Final merge: trimming {resume_audio_trim_seconds:.6f}s from continuation audio and clamping segment audio to visible video duration")
remaining_resume_unique_frames = frames.align_total_unique_frames(
end_frame_exclusive - (start_frame + resumed_unique_frames),
frame_step=frame_plan_rules.frame_step,
minimum_requested_frames=frame_plan_rules.minimum_requested_frames,
initial_overlap_frames=resume_overlap_frames,
)
if remaining_resume_unique_frames <= 0:
trailing_frames = requested_unique_frames - resumed_unique_frames
common.plugin_info(f"Existing output has {resumed_unique_frames} frame(s). The remaining {trailing_frames} frame(s) are too short to build another continuation chunk for the current model, so the existing output is treated as complete.")
resumed_unique_frames = requested_unique_frames
completed_chunks = len(full_plans)
plans = []
else:
continuation_output_path = output_paths.make_continuation_output_path(output_path)
write_state.set_output_path(continuation_output_path)
try:
plans = frames.build_chunk_plan(
start_frame + resumed_unique_frames,
end_frame_exclusive,
total_source_frames,
chunk_frames,
frame_step=frame_plan_rules.frame_step,
minimum_requested_frames=frame_plan_rules.minimum_requested_frames,
overlap_frames=overlap_frames,
initial_overlap_frames=resume_overlap_frames,
)
except frames.FramePlanningError as exc:
raise gr.Error(str(exc)) from exc
elif resume_existing_output:
plans = []
if not resume_existing_output:
plans = full_plans
continued_mode = resumed_unique_frames > 0
use_live_av_mux = selected_audio_track is not None
total_chunks_display = completed_chunks + len(plans)
current_chunk_display = completed_chunks + 1
run_started_at = time.time()
initial_completed_chunks = completed_chunks
def _timing_kwargs(current_completed_chunks=None, phase_current_step=None, phase_total_steps=None):
elapsed_seconds = time.time() - run_started_at
if elapsed_seconds < 0.0:
elapsed_seconds = 0.0
if len(plans) == 0:
return {"elapsed_seconds": elapsed_seconds, "eta_seconds": None}
completed_for_eta = completed_chunks if current_completed_chunks is None else current_completed_chunks
run_completed_chunks = completed_for_eta - initial_completed_chunks
phase_ratio = 0.0
if phase_current_step is not None and phase_total_steps is not None and phase_total_steps > 0:
phase_ratio = float(phase_current_step) / float(phase_total_steps)
overall_ratio = (run_completed_chunks + phase_ratio) / float(len(plans))
eta_seconds = None if overall_ratio <= 0.0 or overall_ratio >= 1.0 else elapsed_seconds * (1.0 - overall_ratio) / overall_ratio
return {"elapsed_seconds": elapsed_seconds, "eta_seconds": eta_seconds}
if len(plans) == 0:
if system_handler is not None and callable(getattr(system_handler, "delete_continue_cache", None)):
system_handler.delete_continue_cache(output_path)
yield self.ui_update(status_ui.render_chunk_status_html(total_chunks_display, completed_chunks, completed_chunks, "Completed", "Existing output already covers the requested range.", continued=continued_mode, **_timing_kwargs()), output_path, str(time.time_ns()), start_enabled=True, abort_enabled=False)
return
planning_text = f"Resuming from {resumed_unique_frames} frame(s) already written." if resumed_unique_frames > 0 else f"Preparing {len(plans)} chunk(s)..."
yield self.ui_update(status_ui.render_chunk_status_html(total_chunks_display, completed_chunks, current_chunk_display, "Planning", planning_text, continued=continued_mode, **_timing_kwargs()), output_path, str(time.time_ns()))
chunk_progress = ChunkProgress(
completed_chunks=completed_chunks,
current_chunk_display=current_chunk_display,
chunk_output_paths=chunk_output_paths,
last_segment_path=last_segment_path,
write_state=write_state,
resolved_resolution=resolved_resolution,
resolved_width=resolved_width,
resolved_height=resolved_height,
continue_cache=continue_cache,
)
continue_cache = None
chunk_result = yield from ChunkExecutor(
plugin=self.plugin,
api_session=self.api_session,
active_job=self.active_job,
preview_state=self.preview_state,
ui_update=self.ui_update,
ui_skip=self.ui_skip,
reset_live_chunk_status=self.reset_live_chunk_status,
).run(ProcessContext(
state=request.state,
process_settings=process_definition["settings"],
model_type=model_type,
process_is_hdr=process_is_hdr,
is_user_process=is_user_process,
has_outpaint_setting=has_outpaint_setting,
uses_builtin_outpaint_ui=uses_builtin_outpaint_ui,
use_lora_strength_override=use_lora_strength_override,
active_process_strength=active_process_strength,
active_target_ratio=active_target_ratio,
source_path=source_path,
output_path=output_path,
selected_audio_track=selected_audio_track,
prompt_schedule=prompt_schedule,
default_prompt_text=default_prompt_text,
budget_resolution=budget_resolution,
start_frame=start_frame,
resumed_unique_frames=resumed_unique_frames,
requested_unique_frames=requested_unique_frames,
overlap_frames=overlap_frames,
processing_fps=processing_fps,
fps_float=fps_float,
continued_mode=continued_mode,
plans=plans,
total_chunks_display=total_chunks_display,
ffmpeg_path=ffmpeg_path,
use_live_av_mux=use_live_av_mux,
output_container=output_container,
exact_start_seconds=exact_start_seconds,
timing_kwargs=_timing_kwargs,
system_handler=system_handler,
system_target_control=system_target_control,
), chunk_progress)
written_unique_frames = chunk_result.written_unique_frames
completed_chunks = chunk_result.completed_chunks
current_chunk_display = chunk_result.current_chunk_display
resolved_resolution = chunk_result.resolved_resolution
resolved_width = chunk_result.resolved_width
resolved_height = chunk_result.resolved_height
last_segment_path = chunk_result.last_segment_path
chunk_output_paths = chunk_result.chunk_output_paths
continue_cache = chunk_result.continue_cache
if write_state.mux_process is None:
if write_state.stopped and resumed_unique_frames > 0:
common.plugin_info(f"Processing was stopped before writing a new chunk. Kept existing output at {output_path}")
yield self.ui_update(status_ui.render_chunk_status_html(total_chunks_display, completed_chunks, current_chunk_display, "Stopped", "Stopped before a new chunk was written. Existing output kept.", continued=continued_mode, **_timing_kwargs()), output_path, self.ui_skip, start_enabled=True, abort_enabled=False)
return
if write_state.stopped:
common.plugin_info("Processing was stopped before any output chunk was written.")
yield self.ui_update(status_ui.render_chunk_status_html(total_chunks_display, completed_chunks, current_chunk_display, "Stopped", "Stopped before any output chunk was written.", continued=continued_mode, **_timing_kwargs()), self.ui_skip, self.ui_skip, start_enabled=True, abort_enabled=False)
return
raise gr.Error("Processing completed without creating an output file.")
if self.active_job.get("cancel_requested"):
write_state.stopped = True
finalizing_message = "Finalizing written output before merge..." if continuation_output_path and os.path.isfile(write_state.output_path_for_write) else "Finalizing written output..."
yield self.ui_update(status_ui.render_chunk_status_html(total_chunks_display, completed_chunks, current_chunk_display, "Finalizing Output", finalizing_message, continued=continued_mode, **_timing_kwargs()), output_path if os.path.isfile(output_path) else self.ui_skip, str(time.time_ns()), start_enabled=False, abort_enabled=False)
return_code, stderr, forced_termination = write_state.finalize()
if self.active_job.get("cancel_requested"):
write_state.stopped = True
if forced_termination:
raise gr.Error("ffmpeg did not finalize the partial output in time.")
if return_code != 0 and not (write_state.stopped and os.path.isfile(write_state.output_path_for_write if use_live_av_mux else write_state.video_only_output_path)):
raise gr.Error(stderr or "ffmpeg failed while assembling the processed video.")
if use_live_av_mux and os.path.isfile(write_state.output_path_for_write) and os.path.getsize(write_state.output_path_for_write) <= 0:
media.delete_file_if_exists(write_state.output_path_for_write, label="continuation output")
raise gr.Error("ffmpeg created an empty continuation file.")
if not use_live_av_mux and os.path.isfile(write_state.video_only_output_path):
try:
os.replace(write_state.video_only_output_path, write_state.output_path_for_write)
except OSError as exc:
raise gr.Error(f"Unable to finalize the written video-only segment: {write_state.output_path_for_write}") from exc
undeleted_merged_continuation_paths: list[str] = []
if continuation_output_path and os.path.isfile(write_state.output_path_for_write):
continuation_signature = process_metadata.make_continuation_signature(write_state.output_path_for_write)
try:
existing_output_generation_time = process_metadata.read_metadata_generation_time(output_path)
merged_duration_seconds = float(resumed_unique_frames + written_unique_frames) / fps_float
committed_signatures = process_metadata.append_merged_continuation_signature(merged_continuation_signatures, continuation_signature)
yield self.ui_update(status_ui.render_chunk_status_html(total_chunks_display, completed_chunks, current_chunk_display, "Merging Continuation", "Merging the continued segment into the main output...", continued=continued_mode, **_timing_kwargs()), output_path, str(time.time_ns()), start_enabled=False, abort_enabled=False)
media.concat_video_segments(
ffmpeg_path,
[output_path, write_state.output_path_for_write],
output_path,
self.plugin.server_config.get("video_output_codec", "libx264_8"),
output_container,
self.plugin.server_config.get("audio_output_codec", "aac_128"),
segment_audio_trim_seconds=[0.0, resume_audio_trim_seconds],
segment_audio_duration_seconds=[(float(resumed_unique_frames) / fps_float) if resumed_unique_frames > 0 else None, (float(written_unique_frames) / fps_float) if written_unique_frames > 0 else None],
fps_float=fps_float,
selected_audio_track_no=selected_audio_track,
reserved_metadata_path=write_state.reserved_metadata_path,
source_audio_path=source_path if continuation_recovery.USE_SOURCE_AUDIO_FOR_CONTINUATION_MERGE and selected_audio_track is not None else None,
source_audio_start_seconds=(start_frame / fps_float) if continuation_recovery.USE_SOURCE_AUDIO_FOR_CONTINUATION_MERGE and selected_audio_track is not None else None,
source_audio_duration_seconds=merged_duration_seconds if continuation_recovery.USE_SOURCE_AUDIO_FOR_CONTINUATION_MERGE and selected_audio_track is not None else None,
source_audio_track_no=selected_audio_track if continuation_recovery.USE_SOURCE_AUDIO_FOR_CONTINUATION_MERGE and selected_audio_track is not None else None,
)
merged_continuation = True
merged_continuation_signatures = committed_signatures
process_metadata.store_process_progress(output_path, written_unique_frames=resumed_unique_frames + written_unique_frames, merged_signatures=merged_continuation_signatures, verbose_level=verbose_level)
except media.ContinuationMergeOutputLockedError:
locked_message = f"{Path(output_path).name} is open, so the continuation merge could not replace it. Existing output was kept and {Path(write_state.output_path_for_write).name} was preserved. Release the base file and start a process again."
gr.Info(locked_message)
media.delete_file_if_exists(write_state.reserved_metadata_path, label="reserved metadata file")
yield self.ui_update(status_ui.render_chunk_status_html(total_chunks_display, completed_chunks, current_chunk_display, "Merge Pending", locked_message, continued=continued_mode, **_timing_kwargs()), write_state.output_path_for_write, str(time.time_ns()), start_enabled=True, abort_enabled=False)
return
except Exception as exc:
raise gr.Error(f"Failed to finalize continued output. Existing output kept, and continuation was preserved at {continuation_output_path}. {exc}") from exc
if os.path.isfile(write_state.output_path_for_write):
try:
os.remove(write_state.output_path_for_write)
if system_handler is not None and callable(getattr(system_handler, "delete_continue_cache", None)):
system_handler.delete_continue_cache(write_state.output_path_for_write)
except OSError:
undeleted_merged_continuation_paths.append(write_state.output_path_for_write)
common.plugin_info(f"Merged continuation progress into {Path(output_path).name}, but {Path(write_state.output_path_for_write).name} could not be deleted because it is still open. Delete it manually when released.")
else:
existing_output_generation_time = 0.0
media.delete_file_if_exists(write_state.reserved_metadata_path, label="reserved metadata file")
total_written_unique_frames = resumed_unique_frames + written_unique_frames
if not write_state.stopped and total_written_unique_frames < requested_unique_frames:
raise gr.Error(f"Processing wrote {total_written_unique_frames} frame(s), but {requested_unique_frames} frame(s) were required.")
metadata_target_path = output_path if merged_continuation or not continuation_output_path else write_state.output_path_for_write
yield self.ui_update(status_ui.render_chunk_status_html(total_chunks_display, completed_chunks, current_chunk_display, "Writing Metadata", "Writing final output metadata...", continued=continued_mode, **_timing_kwargs()), metadata_target_path if os.path.isfile(metadata_target_path) else output_path, str(time.time_ns()), start_enabled=False, abort_enabled=False)
metadata_source_path = last_segment_path or media.get_last_generated_video_path(chunk_output_paths) or metadata_target_path
actual_output_frames = media.probe_resume_frame_count(ffprobe_path, metadata_target_path, fps_float)[0] if os.path.isfile(metadata_target_path) else 0
if actual_output_frames <= 0:
if metadata_target_path != output_path or not resume_existing_output:
media.delete_file_if_exists(metadata_target_path, label="invalid output")
raise gr.Error("Final output does not contain a readable video frame.")
if actual_output_frames < total_written_unique_frames:
if write_state.stopped:
common.plugin_info(f"Stopped output contains {actual_output_frames} readable frame(s), lower than the {total_written_unique_frames} frame(s) attempted. Recording the probed frame count.")
total_written_unique_frames = actual_output_frames
else:
raise gr.Error(f"Final output contains {actual_output_frames} readable frame(s), but {total_written_unique_frames} frame(s) were written.")
total_generation_time = existing_output_generation_time + process_metadata.read_metadata_generation_time(metadata_source_path) if merged_continuation else process_metadata.read_metadata_generation_time(metadata_source_path)
output_process_metadata = {
"process": process_display_name,
"written_unique_frames": int(total_written_unique_frames),
"chunks": int(total_chunks_display),
"sliding_window_overlap": int(overlap_frames),
"start_seconds": float(start_seconds),
"end_seconds": float(start_seconds + (total_written_unique_frames / float(fps_float))),
"source_video": source_path,
"source_segment": requested_source_segment,
"merged_continuations": process_metadata.normalize_merged_continuation_signatures(merged_continuation_signatures),
}
if process_is_hdr:
output_process_metadata["hdr"] = True
metadata_written = process_metadata.store_output_metadata(metadata_target_path, metadata_source_path, source_path=source_path, process_name=process_display_name, source_start_seconds=start_seconds, start_frame=start_frame, fps_float=fps_float, selected_audio_track=selected_audio_track, total_generation_time=total_generation_time, actual_frame_count=actual_output_frames, process_metadata=output_process_metadata, verbose_level=verbose_level)
completed_output = not write_state.stopped and (total_written_unique_frames >= requested_unique_frames or completed_chunks >= total_chunks_display)
if system_handler is not None and completed_output and callable(getattr(system_handler, "delete_continue_cache", None)):
for cache_output_path in dict.fromkeys([metadata_target_path, output_path, write_state.output_path_for_write]):
if cache_output_path:
system_handler.delete_continue_cache(cache_output_path)
continue_cache = None
elif system_handler is not None and continue_cache is not None and hasattr(system_handler, "save_continue_cache"):
system_handler.save_continue_cache(continue_cache, metadata_target_path, metadata=output_process_metadata)
if not metadata_written:
raise gr.Error(f"Failed to write WanGP metadata to {metadata_target_path}. The partial output was kept, but continuation may require the sidecar cache.")
chunk_output_paths = media.delete_released_chunk_outputs(request.state, chunk_output_paths)
if write_state.stopped:
stopped_output_path = output_path
if merged_continuation:
common.plugin_info(f"Processing was stopped. Merged continued progress into {output_path}")
stop_message = f"Stopped after {total_written_unique_frames} frame(s). Continued progress was merged into the output."
elif continuation_output_path and os.path.isfile(write_state.output_path_for_write):
stopped_output_path = write_state.output_path_for_write
common.plugin_info(f"Processing was stopped. Kept existing output at {output_path} and preserved continuation clip at {write_state.output_path_for_write}")
stop_message = f"Stopped after {total_written_unique_frames} frame(s). Existing output kept and continuation clip preserved."
else:
common.plugin_info(f"Processing was stopped. Kept partial output at {output_path}")
stop_message = f"Stopped after {total_written_unique_frames} frame(s). Partial output kept."
yield self.ui_update(status_ui.render_chunk_status_html(total_chunks_display, completed_chunks, current_chunk_display, "Stopped", stop_message, continued=continued_mode, **_timing_kwargs()), stopped_output_path, self.ui_skip, start_enabled=True, abort_enabled=False)
return
yield self.ui_update(status_ui.render_chunk_status_html(total_chunks_display, total_chunks_display, total_chunks_display, "Completed", f"Completed {total_chunks_display} chunk(s).", continued=continued_mode, **_timing_kwargs()), output_path, self.ui_skip, start_enabled=True, abort_enabled=False)
self.active_job["job"] = None
write_state.cleanup_partial_outputs()
except gr.Error as exc:
self.active_job["job"] = None
write_state.cleanup_partial_outputs()
status_message = common.get_error_message(exc) or "Processing failed."
if not started_ui:
gr.Info(status_message)
return
if started_ui:
total_chunks_value = total_chunks_display
completed_value = completed_chunks
current_value = completed_chunks + 1 if completed_chunks < total_chunks_display else total_chunks_display
continued_value = resumed_unique_frames > 0
output_value = output_path if isinstance(output_path, str) and os.path.isfile(output_path) else self.ui_skip
if preflight_stage:
gr.Info(status_message)
yield self.ui_update(status_ui.render_chunk_status_html(total_chunks_value, completed_value, current_value, "Info" if preflight_stage else "Error", status_message, continued=continued_value), output_value, self.ui_skip, start_enabled=True, abort_enabled=False)
return
except BaseException as exc:
self.active_job["job"] = None
write_state.cleanup_partial_outputs()
if started_ui:
total_chunks_value = total_chunks_display
completed_value = completed_chunks
current_value = completed_chunks + 1 if completed_chunks < total_chunks_display else total_chunks_display
continued_value = resumed_unique_frames > 0
status_message = common.get_error_message(exc) or exc.__class__.__name__
output_value = output_path if isinstance(output_path, str) and os.path.isfile(output_path) else self.ui_skip
yield self.ui_update(status_ui.render_chunk_status_html(total_chunks_value, completed_value, current_value, "Error", status_message, continued=continued_value), output_value, self.ui_skip, start_enabled=True, abort_enabled=False)
raise
finally:
self.active_job["running"] = False
self.active_job["write_state"] = None
video.clear_process_full_video_source()
|