from __future__ import annotations import os import time from dataclasses import dataclass, field from pathlib import Path import gradio as gr from shared.utils.hdr import VIDEO_PROMPT_HDR_OUTPUT_FLAG from .chunk_executor import ChunkExecutor, ChunkProgress, ProcessContext from . import common from . import continuation_recovery from . import frame_planning as frames from . import media_io as media from . import output_paths from . import process_catalog as catalog from . import process_metadata from . import prompt_schedule as prompts from . import status_ui from . import video_buffers as video from .mux_session import MuxSession from .run_preparation import ProcessInfoExit, prepare_run @dataclass(frozen=True) class RunRequest: state: dict | None = None process_name: str = "" user_refs: list[str] = field(default_factory=list) source_path: str = "" process_strength: object = None output_path: str = "" prompt_text: str = "" continue_enabled: bool = True source_audio_track: str = "" output_resolution: str = "720p" target_ratio: str = "" chunk_size_seconds: object = 10.0 sliding_window_overlap: object = 1 start_seconds: str = "" end_seconds: str = "" @classmethod def from_gradio( cls, state=None, process_name="", user_refs=None, source_path="", process_strength=None, output_path="", prompt_text="", continue_enabled=True, source_audio_track="", output_resolution="720p", target_ratio="", chunk_size_seconds=10.0, sliding_window_overlap=1, start_seconds="", end_seconds="", ) -> "RunRequest": return cls( state=state, process_name=str(process_name or "").strip(), user_refs=list(user_refs or []), source_path=str(source_path or "").strip(), process_strength=process_strength, output_path=str(output_path or "").strip(), prompt_text=str(prompt_text or ""), continue_enabled=bool(continue_enabled), source_audio_track=str(source_audio_track or "").strip(), output_resolution=str(output_resolution or "720p").strip(), target_ratio=str(target_ratio or "").strip(), chunk_size_seconds=chunk_size_seconds, sliding_window_overlap=sliding_window_overlap, start_seconds="" if start_seconds in (None, "") else str(start_seconds), end_seconds="" if end_seconds in (None, "") else str(end_seconds), ) class ProcessRunner: def __init__(self, *, plugin, api_session, library, get_model_def, active_job: dict, preview_state: dict, ui_skip, ui_update, info_exit, reset_live_chunk_status) -> None: self.plugin = plugin self.api_session = api_session self.library = library self.get_model_def = get_model_def self.active_job = active_job self.preview_state = preview_state self.ui_skip = ui_skip self.ui_update = ui_update self.info_exit = info_exit self.reset_live_chunk_status = reset_live_chunk_status def start_process(self, state=None, process_name="", user_refs=None, source_path="", process_strength=None, output_path="", prompt_text="", continue_enabled=True, source_audio_track="", output_resolution="720p", target_ratio="", chunk_size_seconds=10.0, sliding_window_overlap=1, start_seconds="", end_seconds=""): if self.active_job.get("running"): yield self.info_exit("A process is already running.") return request = RunRequest.from_gradio(state, process_name, user_refs, source_path, process_strength, output_path, prompt_text, continue_enabled, source_audio_track, output_resolution, target_ratio, chunk_size_seconds, sliding_window_overlap, start_seconds, end_seconds) process_definition = self.library.process_definition(request.process_name, request.state, request.user_refs) if process_definition is None: yield self.info_exit(f"Unsupported process: {request.process_name}") return system_handler = self.library.system_handler_for_definition(process_definition) if process_definition.get("source") == "user": problems = self.library.validate_user_process_definition(process_definition) if len(problems) > 0: yield self.info_exit(self.library.format_user_process_validation_error(process_definition, problems)) return process_settings = process_definition["settings"] model_type = str(process_settings.get("model_type") or "") if len(model_type) == 0 and system_handler is None: yield self.info_exit(f"Unsupported process: {request.process_name}") return process_display_name = str(process_definition.get("name") or request.process_name or "").strip() process_is_hdr = False if system_handler is not None else VIDEO_PROMPT_HDR_OUTPUT_FLAG in str(process_settings.get("video_prompt_type") or "") is_user_process = process_definition.get("source") == "user" has_outpaint_setting = "video_guide_outpainting" in process_settings uses_builtin_outpaint_ui = self.library.uses_builtin_outpaint_ui(process_definition) user_lora_strength_override_default = self.library.user_lora_strength_override_default(process_definition) use_lora_strength_override = system_handler is None and not uses_builtin_outpaint_ui and (not is_user_process or user_lora_strength_override_default is not None) process_strength_default = user_lora_strength_override_default if user_lora_strength_override_default is not None else common.get_default_process_strength(process_settings) active_process_strength = 1.0 if uses_builtin_outpaint_ui else (common.coerce_float(request.process_strength, process_strength_default) if use_lora_strength_override else process_strength_default) source_path = request.source_path output_path = request.output_path source_audio_track = request.source_audio_track output_resolution = request.output_resolution target_ratio = request.target_ratio prompt_text = request.prompt_text start_seconds = request.start_seconds end_seconds = request.end_seconds if system_handler is not None and callable(getattr(system_handler, "normalize_target_control_for_process", None)): system_target_control = system_handler.normalize_target_control_for_process(request.target_ratio, process_settings) else: system_target_control = system_handler.normalize_target_control(request.target_ratio) if system_handler is not None and hasattr(system_handler, "normalize_target_control") else "" system_supports_continue_cache = system_handler is not None and (not callable(getattr(system_handler, "supports_continue_cache_for_target", None)) or system_handler.supports_continue_cache_for_target(system_target_control)) try: chunk_size_seconds = common.require_float(request.chunk_size_seconds, "Chunk Size", minimum=0.1) sliding_window_overlap = int(getattr(system_handler, "overlap_frames", 0)) if system_handler is not None else common.require_int(request.sliding_window_overlap, "Sliding Window Overlap", minimum=1) except gr.Error as exc: yield self.info_exit(common.get_error_message(exc) or "Invalid processing settings.") return try: catalog.save_process_full_video_ui_settings({ "process_model_type": model_type, "process_name": request.process_name, "source_path": source_path, "process_strength": active_process_strength, "output_path": output_path, "prompt": prompt_text, "continue_enabled": request.continue_enabled, "source_audio_track": source_audio_track, "output_resolution": output_resolution, "target_ratio": system_target_control if system_handler is not None else target_ratio, "chunk_size_seconds": chunk_size_seconds, "sliding_window_overlap": sliding_window_overlap, "start_seconds": start_seconds, "end_seconds": end_seconds, }) except OSError as exc: yield self.info_exit(f"Unable to save plugin settings to {catalog.PROCESS_FULL_VIDEO_SETTINGS_FILE}: {exc}") return active_target_ratio = target_ratio if uses_builtin_outpaint_ui else "" default_prompt_text = str(process_settings.get("prompt") or "") if len(prompt_text.strip()) == 0: prompt_text = default_prompt_text if not os.path.isfile(source_path): yield self.info_exit(f"Source video not found: {source_path}") return try: start_seconds = prompts.parse_time_input(start_seconds, label="Start", allow_empty=False) end_seconds = prompts.parse_time_input(end_seconds, label="End", allow_empty=True) except gr.Error as exc: yield self.info_exit(common.get_error_message(exc) or "Invalid start/end selection.") return try: prompt_schedule = prompts.parse_prompt_schedule(prompt_text) except gr.Error as exc: yield self.info_exit(common.get_error_message(exc) or f"Invalid prompt syntax.\n\nExample:\n{prompts.TIMED_PROMPT_EXAMPLE}") return started_ui = False preflight_stage = True write_state = MuxSession() total_chunks_display = 1 completed_chunks = 0 resumed_unique_frames = 0 self.active_job["cancel_requested"] = False self.active_job["write_state"] = write_state self.active_job["running"] = True try: video.clear_process_full_video_source() yield self.ui_update(status_ui.render_chunk_status_html(1, 0, 1, "Initializing", "Preparing processing job..."), self.ui_skip, str(time.time_ns()), start_enabled=False, abort_enabled=False) started_ui = True try: prepared_run = prepare_run( plugin=self.plugin, get_model_def=self.get_model_def, process_name=request.process_name, process_display_name=process_display_name, source_path=source_path, output_path=output_path, output_resolution=output_resolution, active_target_ratio=active_target_ratio, continue_enabled=request.continue_enabled, source_audio_track=source_audio_track, chunk_size_seconds=chunk_size_seconds, sliding_window_overlap=sliding_window_overlap, start_seconds=start_seconds, end_seconds=end_seconds, model_type=model_type, process_is_hdr=process_is_hdr, uses_builtin_outpaint_ui=uses_builtin_outpaint_ui, system_handler=system_handler, system_target_control=system_target_control, ) verbose_level = prepared_run.verbose_level start_frame = prepared_run.start_frame end_frame_exclusive = prepared_run.end_frame_exclusive fps_float = prepared_run.fps_float total_source_frames = prepared_run.total_source_frames processing_fps = prepared_run.processing_fps selected_audio_track = prepared_run.selected_audio_track frame_plan_rules = prepared_run.frame_plan_rules budget_resolution = prepared_run.budget_resolution chunk_frames = prepared_run.chunk_frames overlap_frames = prepared_run.overlap_frames full_plans = prepared_run.full_plans requested_unique_frames = prepared_run.requested_unique_frames requested_source_segment = prepared_run.requested_source_segment output_path = prepared_run.output_path resume_existing_output = prepared_run.resume_existing_output ffmpeg_path = prepared_run.ffmpeg_path ffprobe_path = prepared_run.ffprobe_path output_container = prepared_run.output_container merged_continuation_signatures = prepared_run.merged_continuation_signatures except ProcessInfoExit as exc: yield self.info_exit(exc.message, output=exc.output_path or self.ui_skip) return except gr.Error as exc: yield self.info_exit(common.get_error_message(exc) or "Invalid process settings.", output=output_path if isinstance(output_path, str) and os.path.isfile(output_path) else self.ui_skip) return preflight_stage = False if resume_existing_output: recovery_result = yield from continuation_recovery.recover_residual_continuations( plugin=self.plugin, ffmpeg_path=ffmpeg_path, ffprobe_path=ffprobe_path, output_path=output_path, full_plans=full_plans, output_container=output_container, fps_float=fps_float, selected_audio_track=selected_audio_track, source_path=source_path, start_frame=start_frame, merged_signatures=merged_continuation_signatures, verbose_level=verbose_level, ui_update=self.ui_update, ui_skip=self.ui_skip, system_handler=system_handler, system_supports_continue_cache=system_supports_continue_cache, ) merged_continuation_signatures = recovery_result.merged_signatures if recovery_result.blocked: return if self.active_job.get("cancel_requested"): write_state.stopped = True yield self.ui_update(status_ui.render_chunk_status_html(len(full_plans) if len(full_plans) > 0 else 1, 0, 1, "Stopped", "Stopped before processing a new chunk."), output_path if os.path.isfile(output_path) else self.ui_skip, self.ui_skip, start_enabled=True, abort_enabled=False) return last_frame_image = None last_segment_path = None continuation_output_path = "" chunk_output_paths: list[str] = [] written_unique_frames = 0 resumed_unique_frames = 0 resume_overlap_frames = 0 completed_chunks = 0 resolved_resolution = "" resolved_width = 0 resolved_height = 0 merged_continuation = False resume_audio_trim_seconds = 0.0 continue_cache = None self.preview_state["image"] = None write_state.set_output_path(output_path) exact_start_seconds = start_frame / fps_float if resume_existing_output: yield self.ui_update(status_ui.render_chunk_status_html(len(full_plans), 0, 1, "Inspecting Existing Output", f"Inspecting existing output to continue: {output_path}"), output_path, str(time.time_ns())) process_metadata.log_existing_output_metadata(output_path, verbose_level) resumed_unique_frames, resume_reason = media.probe_resume_frame_count(ffprobe_path, output_path, fps_float) recorded_written_unique_frames = process_metadata.read_recorded_written_unique_frames(output_path) if 0 < recorded_written_unique_frames < resumed_unique_frames: common.plugin_info(f"Output contains {resumed_unique_frames} readable frame(s), but metadata recorded {recorded_written_unique_frames}. Using the real frame count for continuation.") process_metadata.store_process_progress(output_path, written_unique_frames=resumed_unique_frames, merged_signatures=merged_continuation_signatures, verbose_level=verbose_level) elif recorded_written_unique_frames > resumed_unique_frames: common.plugin_info(f"Ignoring recorded output progress of {recorded_written_unique_frames} frame(s) because the output only probes as {resumed_unique_frames} frame(s).") if resumed_unique_frames < 0: common.plugin_info(f"Ignoring negative probed output progress from {output_path}.") resumed_unique_frames = 0 elif resumed_unique_frames > requested_unique_frames: common.plugin_info(f"Existing output already covers the requested {requested_unique_frames} frame(s).") resumed_unique_frames = requested_unique_frames if resumed_unique_frames <= 0: common.plugin_info(f"Unable to continue from existing output: {output_path}. {resume_reason or 'Starting a new file instead.'}") output_path = output_paths.make_output_variant(Path(output_path), notify=common.plugin_info) write_state.set_output_path(output_path) resumed_unique_frames = 0 resume_overlap_frames = 0 completed_chunks = 0 exact_start_seconds = start_frame / fps_float resume_existing_output = False else: common.plugin_info(f"Continuing existing output: {output_path}") resolved_resolution, resolved_width, resolved_height = video.probe_existing_output_resolution(output_path) print(f"[Process Full Video] Continuing with locked output resolution {resolved_resolution}") completed_chunks, _ = (frames.count_completed_written_chunks if system_handler is not None else frames.count_completed_chunks)(full_plans, resumed_unique_frames) exact_start_seconds = (start_frame + resumed_unique_frames) / fps_float if resumed_unique_frames < requested_unique_frames: resume_phase = "Planning Source Overlap" if system_handler is not None else "Loading Overlap Frames" yield self.ui_update(status_ui.render_chunk_status_html(len(full_plans), 0, 1, resume_phase, f"Continuing existing output with {resumed_unique_frames} frame(s) already written."), output_path, str(time.time_ns())) checked_unique_frames, last_frame_image, tail_reason = video.resolve_resume_last_frame(output_path, resumed_unique_frames) if system_handler is not None: if checked_unique_frames > 0: resumed_unique_frames = checked_unique_frames if tail_reason: common.plugin_info(tail_reason) if last_frame_image is not None: self.preview_state["image"] = last_frame_image if system_supports_continue_cache and hasattr(system_handler, "load_continue_cache"): sidecar_exists = callable(getattr(system_handler, "cache_sidecar_path", None)) and Path(system_handler.cache_sidecar_path(output_path)).is_file() if sidecar_exists or not callable(getattr(system_handler, "continue_cache_from_tail_frames", None)): continue_cache = system_handler.load_continue_cache(output_path) else: fallback_frames = min(int(getattr(system_handler, "overlap_frames", 0)), int(resumed_unique_frames)) fallback_tail = video.load_process_full_video_overlap_buffer(output_path, fallback_frames, resumed_unique_frames) continue_cache = system_handler.continue_cache_from_tail_frames(fallback_tail, system_target_control) if continue_cache is None: continue_cache = system_handler.load_continue_cache(output_path) common.plugin_info("FlashVSR continuation sidecar is missing; continuing from decoded output tail frames. The first resumed chunk may have lower temporal continuity.") completed_chunks, _ = frames.count_completed_written_chunks(full_plans, resumed_unique_frames) exact_start_seconds = (start_frame + resumed_unique_frames) / fps_float resume_overlap_frames = 0 if resumed_unique_frames < requested_unique_frames: print(f"[Process Full Video] Continuing system process from source frame {start_frame + resumed_unique_frames}" + (" using continue cache" if system_supports_continue_cache else "")) elif checked_unique_frames <= 0 or last_frame_image is None: common.plugin_info(f"Unable to continue from existing output: {output_path}. {tail_reason or 'Starting a new file instead.'}") output_path = output_paths.make_output_variant(Path(output_path), notify=common.plugin_info) write_state.set_output_path(output_path) resumed_unique_frames = 0 resume_overlap_frames = 0 completed_chunks = 0 self.preview_state["image"] = None exact_start_seconds = start_frame / fps_float resume_existing_output = False else: resumed_unique_frames = checked_unique_frames if tail_reason: common.plugin_info(tail_reason) self.preview_state["image"] = last_frame_image completed_chunks, _ = frames.count_completed_chunks(full_plans, resumed_unique_frames) exact_start_seconds = (start_frame + resumed_unique_frames) / fps_float resume_overlap_frames = overlap_frames if resumed_unique_frames < resume_overlap_frames: resume_overlap_frames = resumed_unique_frames overlap_tensor = video.load_process_full_video_hdr_overlap_buffer(output_path, resume_overlap_frames, resumed_unique_frames) if process_is_hdr else video.load_process_full_video_overlap_buffer(output_path, resume_overlap_frames, resumed_unique_frames) if overlap_tensor is None: common.plugin_info(f"Unable to continue from existing output: {output_path}. Failed to load the overlap frames from the recorded output.") output_path = output_paths.make_output_variant(Path(output_path), notify=common.plugin_info) write_state.set_output_path(output_path) resumed_unique_frames = 0 resume_overlap_frames = 0 completed_chunks = 0 self.preview_state["image"] = None exact_start_seconds = start_frame / fps_float resume_existing_output = False else: resume_overlap_frames = int(overlap_tensor.shape[1]) video.set_process_full_video_overlap_buffer(overlap_tensor, processing_fps, hdr=process_is_hdr) print(f"[Process Full Video] Loaded overlap buffer from existing output: {frames.describe_frame_range(start_frame + resumed_unique_frames - resume_overlap_frames, resume_overlap_frames)}") if resume_existing_output and resumed_unique_frames < requested_unique_frames: if not continuation_recovery.USE_SOURCE_AUDIO_FOR_CONTINUATION_MERGE: resume_audio_trim_seconds = media.probe_selected_audio_overhang(ffprobe_path, output_path, selected_audio_track, resumed_unique_frames / fps_float) if continuation_recovery.USE_SOURCE_AUDIO_FOR_CONTINUATION_MERGE and selected_audio_track is not None: print(f"[Process Full Video] Final merge: rebuilding audio from source starting at {start_frame / fps_float:.6f}s") elif resume_audio_trim_seconds > 0.0: print(f"[Process Full Video] Final merge: trimming {resume_audio_trim_seconds:.6f}s from continuation audio and clamping segment audio to visible video duration") remaining_resume_unique_frames = frames.align_total_unique_frames( end_frame_exclusive - (start_frame + resumed_unique_frames), frame_step=frame_plan_rules.frame_step, minimum_requested_frames=frame_plan_rules.minimum_requested_frames, initial_overlap_frames=resume_overlap_frames, ) if remaining_resume_unique_frames <= 0: trailing_frames = requested_unique_frames - resumed_unique_frames common.plugin_info(f"Existing output has {resumed_unique_frames} frame(s). The remaining {trailing_frames} frame(s) are too short to build another continuation chunk for the current model, so the existing output is treated as complete.") resumed_unique_frames = requested_unique_frames completed_chunks = len(full_plans) plans = [] else: continuation_output_path = output_paths.make_continuation_output_path(output_path) write_state.set_output_path(continuation_output_path) try: plans = frames.build_chunk_plan( start_frame + resumed_unique_frames, end_frame_exclusive, total_source_frames, chunk_frames, frame_step=frame_plan_rules.frame_step, minimum_requested_frames=frame_plan_rules.minimum_requested_frames, overlap_frames=overlap_frames, initial_overlap_frames=resume_overlap_frames, ) except frames.FramePlanningError as exc: raise gr.Error(str(exc)) from exc elif resume_existing_output: plans = [] if not resume_existing_output: plans = full_plans continued_mode = resumed_unique_frames > 0 use_live_av_mux = selected_audio_track is not None total_chunks_display = completed_chunks + len(plans) current_chunk_display = completed_chunks + 1 run_started_at = time.time() initial_completed_chunks = completed_chunks def _timing_kwargs(current_completed_chunks=None, phase_current_step=None, phase_total_steps=None): elapsed_seconds = time.time() - run_started_at if elapsed_seconds < 0.0: elapsed_seconds = 0.0 if len(plans) == 0: return {"elapsed_seconds": elapsed_seconds, "eta_seconds": None} completed_for_eta = completed_chunks if current_completed_chunks is None else current_completed_chunks run_completed_chunks = completed_for_eta - initial_completed_chunks phase_ratio = 0.0 if phase_current_step is not None and phase_total_steps is not None and phase_total_steps > 0: phase_ratio = float(phase_current_step) / float(phase_total_steps) overall_ratio = (run_completed_chunks + phase_ratio) / float(len(plans)) eta_seconds = None if overall_ratio <= 0.0 or overall_ratio >= 1.0 else elapsed_seconds * (1.0 - overall_ratio) / overall_ratio return {"elapsed_seconds": elapsed_seconds, "eta_seconds": eta_seconds} if len(plans) == 0: if system_handler is not None and callable(getattr(system_handler, "delete_continue_cache", None)): system_handler.delete_continue_cache(output_path) yield self.ui_update(status_ui.render_chunk_status_html(total_chunks_display, completed_chunks, completed_chunks, "Completed", "Existing output already covers the requested range.", continued=continued_mode, **_timing_kwargs()), output_path, str(time.time_ns()), start_enabled=True, abort_enabled=False) return planning_text = f"Resuming from {resumed_unique_frames} frame(s) already written." if resumed_unique_frames > 0 else f"Preparing {len(plans)} chunk(s)..." yield self.ui_update(status_ui.render_chunk_status_html(total_chunks_display, completed_chunks, current_chunk_display, "Planning", planning_text, continued=continued_mode, **_timing_kwargs()), output_path, str(time.time_ns())) chunk_progress = ChunkProgress( completed_chunks=completed_chunks, current_chunk_display=current_chunk_display, chunk_output_paths=chunk_output_paths, last_segment_path=last_segment_path, write_state=write_state, resolved_resolution=resolved_resolution, resolved_width=resolved_width, resolved_height=resolved_height, continue_cache=continue_cache, ) continue_cache = None chunk_result = yield from ChunkExecutor( plugin=self.plugin, api_session=self.api_session, active_job=self.active_job, preview_state=self.preview_state, ui_update=self.ui_update, ui_skip=self.ui_skip, reset_live_chunk_status=self.reset_live_chunk_status, ).run(ProcessContext( state=request.state, process_settings=process_definition["settings"], model_type=model_type, process_is_hdr=process_is_hdr, is_user_process=is_user_process, has_outpaint_setting=has_outpaint_setting, uses_builtin_outpaint_ui=uses_builtin_outpaint_ui, use_lora_strength_override=use_lora_strength_override, active_process_strength=active_process_strength, active_target_ratio=active_target_ratio, source_path=source_path, output_path=output_path, selected_audio_track=selected_audio_track, prompt_schedule=prompt_schedule, default_prompt_text=default_prompt_text, budget_resolution=budget_resolution, start_frame=start_frame, resumed_unique_frames=resumed_unique_frames, requested_unique_frames=requested_unique_frames, overlap_frames=overlap_frames, processing_fps=processing_fps, fps_float=fps_float, continued_mode=continued_mode, plans=plans, total_chunks_display=total_chunks_display, ffmpeg_path=ffmpeg_path, use_live_av_mux=use_live_av_mux, output_container=output_container, exact_start_seconds=exact_start_seconds, timing_kwargs=_timing_kwargs, system_handler=system_handler, system_target_control=system_target_control, ), chunk_progress) written_unique_frames = chunk_result.written_unique_frames completed_chunks = chunk_result.completed_chunks current_chunk_display = chunk_result.current_chunk_display resolved_resolution = chunk_result.resolved_resolution resolved_width = chunk_result.resolved_width resolved_height = chunk_result.resolved_height last_segment_path = chunk_result.last_segment_path chunk_output_paths = chunk_result.chunk_output_paths continue_cache = chunk_result.continue_cache if write_state.mux_process is None: if write_state.stopped and resumed_unique_frames > 0: common.plugin_info(f"Processing was stopped before writing a new chunk. Kept existing output at {output_path}") yield self.ui_update(status_ui.render_chunk_status_html(total_chunks_display, completed_chunks, current_chunk_display, "Stopped", "Stopped before a new chunk was written. Existing output kept.", continued=continued_mode, **_timing_kwargs()), output_path, self.ui_skip, start_enabled=True, abort_enabled=False) return if write_state.stopped: common.plugin_info("Processing was stopped before any output chunk was written.") yield self.ui_update(status_ui.render_chunk_status_html(total_chunks_display, completed_chunks, current_chunk_display, "Stopped", "Stopped before any output chunk was written.", continued=continued_mode, **_timing_kwargs()), self.ui_skip, self.ui_skip, start_enabled=True, abort_enabled=False) return raise gr.Error("Processing completed without creating an output file.") if self.active_job.get("cancel_requested"): write_state.stopped = True finalizing_message = "Finalizing written output before merge..." if continuation_output_path and os.path.isfile(write_state.output_path_for_write) else "Finalizing written output..." yield self.ui_update(status_ui.render_chunk_status_html(total_chunks_display, completed_chunks, current_chunk_display, "Finalizing Output", finalizing_message, continued=continued_mode, **_timing_kwargs()), output_path if os.path.isfile(output_path) else self.ui_skip, str(time.time_ns()), start_enabled=False, abort_enabled=False) return_code, stderr, forced_termination = write_state.finalize() if self.active_job.get("cancel_requested"): write_state.stopped = True if forced_termination: raise gr.Error("ffmpeg did not finalize the partial output in time.") if return_code != 0 and not (write_state.stopped and os.path.isfile(write_state.output_path_for_write if use_live_av_mux else write_state.video_only_output_path)): raise gr.Error(stderr or "ffmpeg failed while assembling the processed video.") if use_live_av_mux and os.path.isfile(write_state.output_path_for_write) and os.path.getsize(write_state.output_path_for_write) <= 0: media.delete_file_if_exists(write_state.output_path_for_write, label="continuation output") raise gr.Error("ffmpeg created an empty continuation file.") if not use_live_av_mux and os.path.isfile(write_state.video_only_output_path): try: os.replace(write_state.video_only_output_path, write_state.output_path_for_write) except OSError as exc: raise gr.Error(f"Unable to finalize the written video-only segment: {write_state.output_path_for_write}") from exc undeleted_merged_continuation_paths: list[str] = [] if continuation_output_path and os.path.isfile(write_state.output_path_for_write): continuation_signature = process_metadata.make_continuation_signature(write_state.output_path_for_write) try: existing_output_generation_time = process_metadata.read_metadata_generation_time(output_path) merged_duration_seconds = float(resumed_unique_frames + written_unique_frames) / fps_float committed_signatures = process_metadata.append_merged_continuation_signature(merged_continuation_signatures, continuation_signature) yield self.ui_update(status_ui.render_chunk_status_html(total_chunks_display, completed_chunks, current_chunk_display, "Merging Continuation", "Merging the continued segment into the main output...", continued=continued_mode, **_timing_kwargs()), output_path, str(time.time_ns()), start_enabled=False, abort_enabled=False) media.concat_video_segments( ffmpeg_path, [output_path, write_state.output_path_for_write], output_path, self.plugin.server_config.get("video_output_codec", "libx264_8"), output_container, self.plugin.server_config.get("audio_output_codec", "aac_128"), segment_audio_trim_seconds=[0.0, resume_audio_trim_seconds], segment_audio_duration_seconds=[(float(resumed_unique_frames) / fps_float) if resumed_unique_frames > 0 else None, (float(written_unique_frames) / fps_float) if written_unique_frames > 0 else None], fps_float=fps_float, selected_audio_track_no=selected_audio_track, reserved_metadata_path=write_state.reserved_metadata_path, source_audio_path=source_path if continuation_recovery.USE_SOURCE_AUDIO_FOR_CONTINUATION_MERGE and selected_audio_track is not None else None, source_audio_start_seconds=(start_frame / fps_float) if continuation_recovery.USE_SOURCE_AUDIO_FOR_CONTINUATION_MERGE and selected_audio_track is not None else None, source_audio_duration_seconds=merged_duration_seconds if continuation_recovery.USE_SOURCE_AUDIO_FOR_CONTINUATION_MERGE and selected_audio_track is not None else None, source_audio_track_no=selected_audio_track if continuation_recovery.USE_SOURCE_AUDIO_FOR_CONTINUATION_MERGE and selected_audio_track is not None else None, ) merged_continuation = True merged_continuation_signatures = committed_signatures process_metadata.store_process_progress(output_path, written_unique_frames=resumed_unique_frames + written_unique_frames, merged_signatures=merged_continuation_signatures, verbose_level=verbose_level) except media.ContinuationMergeOutputLockedError: locked_message = f"{Path(output_path).name} is open, so the continuation merge could not replace it. Existing output was kept and {Path(write_state.output_path_for_write).name} was preserved. Release the base file and start a process again." gr.Info(locked_message) media.delete_file_if_exists(write_state.reserved_metadata_path, label="reserved metadata file") yield self.ui_update(status_ui.render_chunk_status_html(total_chunks_display, completed_chunks, current_chunk_display, "Merge Pending", locked_message, continued=continued_mode, **_timing_kwargs()), write_state.output_path_for_write, str(time.time_ns()), start_enabled=True, abort_enabled=False) return except Exception as exc: raise gr.Error(f"Failed to finalize continued output. Existing output kept, and continuation was preserved at {continuation_output_path}. {exc}") from exc if os.path.isfile(write_state.output_path_for_write): try: os.remove(write_state.output_path_for_write) if system_handler is not None and callable(getattr(system_handler, "delete_continue_cache", None)): system_handler.delete_continue_cache(write_state.output_path_for_write) except OSError: undeleted_merged_continuation_paths.append(write_state.output_path_for_write) common.plugin_info(f"Merged continuation progress into {Path(output_path).name}, but {Path(write_state.output_path_for_write).name} could not be deleted because it is still open. Delete it manually when released.") else: existing_output_generation_time = 0.0 media.delete_file_if_exists(write_state.reserved_metadata_path, label="reserved metadata file") total_written_unique_frames = resumed_unique_frames + written_unique_frames if not write_state.stopped and total_written_unique_frames < requested_unique_frames: raise gr.Error(f"Processing wrote {total_written_unique_frames} frame(s), but {requested_unique_frames} frame(s) were required.") metadata_target_path = output_path if merged_continuation or not continuation_output_path else write_state.output_path_for_write yield self.ui_update(status_ui.render_chunk_status_html(total_chunks_display, completed_chunks, current_chunk_display, "Writing Metadata", "Writing final output metadata...", continued=continued_mode, **_timing_kwargs()), metadata_target_path if os.path.isfile(metadata_target_path) else output_path, str(time.time_ns()), start_enabled=False, abort_enabled=False) metadata_source_path = last_segment_path or media.get_last_generated_video_path(chunk_output_paths) or metadata_target_path actual_output_frames = media.probe_resume_frame_count(ffprobe_path, metadata_target_path, fps_float)[0] if os.path.isfile(metadata_target_path) else 0 if actual_output_frames <= 0: if metadata_target_path != output_path or not resume_existing_output: media.delete_file_if_exists(metadata_target_path, label="invalid output") raise gr.Error("Final output does not contain a readable video frame.") if actual_output_frames < total_written_unique_frames: if write_state.stopped: common.plugin_info(f"Stopped output contains {actual_output_frames} readable frame(s), lower than the {total_written_unique_frames} frame(s) attempted. Recording the probed frame count.") total_written_unique_frames = actual_output_frames else: raise gr.Error(f"Final output contains {actual_output_frames} readable frame(s), but {total_written_unique_frames} frame(s) were written.") total_generation_time = existing_output_generation_time + process_metadata.read_metadata_generation_time(metadata_source_path) if merged_continuation else process_metadata.read_metadata_generation_time(metadata_source_path) output_process_metadata = { "process": process_display_name, "written_unique_frames": int(total_written_unique_frames), "chunks": int(total_chunks_display), "sliding_window_overlap": int(overlap_frames), "start_seconds": float(start_seconds), "end_seconds": float(start_seconds + (total_written_unique_frames / float(fps_float))), "source_video": source_path, "source_segment": requested_source_segment, "merged_continuations": process_metadata.normalize_merged_continuation_signatures(merged_continuation_signatures), } if process_is_hdr: output_process_metadata["hdr"] = True metadata_written = process_metadata.store_output_metadata(metadata_target_path, metadata_source_path, source_path=source_path, process_name=process_display_name, source_start_seconds=start_seconds, start_frame=start_frame, fps_float=fps_float, selected_audio_track=selected_audio_track, total_generation_time=total_generation_time, actual_frame_count=actual_output_frames, process_metadata=output_process_metadata, verbose_level=verbose_level) completed_output = not write_state.stopped and (total_written_unique_frames >= requested_unique_frames or completed_chunks >= total_chunks_display) if system_handler is not None and completed_output and callable(getattr(system_handler, "delete_continue_cache", None)): for cache_output_path in dict.fromkeys([metadata_target_path, output_path, write_state.output_path_for_write]): if cache_output_path: system_handler.delete_continue_cache(cache_output_path) continue_cache = None elif system_handler is not None and continue_cache is not None and hasattr(system_handler, "save_continue_cache"): system_handler.save_continue_cache(continue_cache, metadata_target_path, metadata=output_process_metadata) if not metadata_written: raise gr.Error(f"Failed to write WanGP metadata to {metadata_target_path}. The partial output was kept, but continuation may require the sidecar cache.") chunk_output_paths = media.delete_released_chunk_outputs(request.state, chunk_output_paths) if write_state.stopped: stopped_output_path = output_path if merged_continuation: common.plugin_info(f"Processing was stopped. Merged continued progress into {output_path}") stop_message = f"Stopped after {total_written_unique_frames} frame(s). Continued progress was merged into the output." elif continuation_output_path and os.path.isfile(write_state.output_path_for_write): stopped_output_path = write_state.output_path_for_write common.plugin_info(f"Processing was stopped. Kept existing output at {output_path} and preserved continuation clip at {write_state.output_path_for_write}") stop_message = f"Stopped after {total_written_unique_frames} frame(s). Existing output kept and continuation clip preserved." else: common.plugin_info(f"Processing was stopped. Kept partial output at {output_path}") stop_message = f"Stopped after {total_written_unique_frames} frame(s). Partial output kept." yield self.ui_update(status_ui.render_chunk_status_html(total_chunks_display, completed_chunks, current_chunk_display, "Stopped", stop_message, continued=continued_mode, **_timing_kwargs()), stopped_output_path, self.ui_skip, start_enabled=True, abort_enabled=False) return yield self.ui_update(status_ui.render_chunk_status_html(total_chunks_display, total_chunks_display, total_chunks_display, "Completed", f"Completed {total_chunks_display} chunk(s).", continued=continued_mode, **_timing_kwargs()), output_path, self.ui_skip, start_enabled=True, abort_enabled=False) self.active_job["job"] = None write_state.cleanup_partial_outputs() except gr.Error as exc: self.active_job["job"] = None write_state.cleanup_partial_outputs() status_message = common.get_error_message(exc) or "Processing failed." if not started_ui: gr.Info(status_message) return if started_ui: total_chunks_value = total_chunks_display completed_value = completed_chunks current_value = completed_chunks + 1 if completed_chunks < total_chunks_display else total_chunks_display continued_value = resumed_unique_frames > 0 output_value = output_path if isinstance(output_path, str) and os.path.isfile(output_path) else self.ui_skip if preflight_stage: gr.Info(status_message) yield self.ui_update(status_ui.render_chunk_status_html(total_chunks_value, completed_value, current_value, "Info" if preflight_stage else "Error", status_message, continued=continued_value), output_value, self.ui_skip, start_enabled=True, abort_enabled=False) return except BaseException as exc: self.active_job["job"] = None write_state.cleanup_partial_outputs() if started_ui: total_chunks_value = total_chunks_display completed_value = completed_chunks current_value = completed_chunks + 1 if completed_chunks < total_chunks_display else total_chunks_display continued_value = resumed_unique_frames > 0 status_message = common.get_error_message(exc) or exc.__class__.__name__ output_value = output_path if isinstance(output_path, str) and os.path.isfile(output_path) else self.ui_skip yield self.ui_update(status_ui.render_chunk_status_html(total_chunks_value, completed_value, current_value, "Error", status_message, continued=continued_value), output_value, self.ui_skip, start_enabled=True, abort_enabled=False) raise finally: self.active_job["running"] = False self.active_job["write_state"] = None video.clear_process_full_video_source()