1ripon1's picture
Upload folder using huggingface_hub
7344bef verified
Raw
History Blame Contribute Delete
50 kB
from __future__ import annotations
import os
import time
from dataclasses import dataclass, field
from pathlib import Path
import gradio as gr
from shared.utils.hdr import VIDEO_PROMPT_HDR_OUTPUT_FLAG
from .chunk_executor import ChunkExecutor, ChunkProgress, ProcessContext
from . import common
from . import continuation_recovery
from . import frame_planning as frames
from . import media_io as media
from . import output_paths
from . import process_catalog as catalog
from . import process_metadata
from . import prompt_schedule as prompts
from . import status_ui
from . import video_buffers as video
from .mux_session import MuxSession
from .run_preparation import ProcessInfoExit, prepare_run
@dataclass(frozen=True)
class RunRequest:
state: dict | None = None
process_name: str = ""
user_refs: list[str] = field(default_factory=list)
source_path: str = ""
process_strength: object = None
output_path: str = ""
prompt_text: str = ""
continue_enabled: bool = True
source_audio_track: str = ""
output_resolution: str = "720p"
target_ratio: str = ""
chunk_size_seconds: object = 10.0
sliding_window_overlap: object = 1
start_seconds: str = ""
end_seconds: str = ""
@classmethod
def from_gradio(
cls,
state=None,
process_name="",
user_refs=None,
source_path="",
process_strength=None,
output_path="",
prompt_text="",
continue_enabled=True,
source_audio_track="",
output_resolution="720p",
target_ratio="",
chunk_size_seconds=10.0,
sliding_window_overlap=1,
start_seconds="",
end_seconds="",
) -> "RunRequest":
return cls(
state=state,
process_name=str(process_name or "").strip(),
user_refs=list(user_refs or []),
source_path=str(source_path or "").strip(),
process_strength=process_strength,
output_path=str(output_path or "").strip(),
prompt_text=str(prompt_text or ""),
continue_enabled=bool(continue_enabled),
source_audio_track=str(source_audio_track or "").strip(),
output_resolution=str(output_resolution or "720p").strip(),
target_ratio=str(target_ratio or "").strip(),
chunk_size_seconds=chunk_size_seconds,
sliding_window_overlap=sliding_window_overlap,
start_seconds="" if start_seconds in (None, "") else str(start_seconds),
end_seconds="" if end_seconds in (None, "") else str(end_seconds),
)
class ProcessRunner:
def __init__(self, *, plugin, api_session, library, get_model_def, active_job: dict, preview_state: dict, ui_skip, ui_update, info_exit, reset_live_chunk_status) -> None:
self.plugin = plugin
self.api_session = api_session
self.library = library
self.get_model_def = get_model_def
self.active_job = active_job
self.preview_state = preview_state
self.ui_skip = ui_skip
self.ui_update = ui_update
self.info_exit = info_exit
self.reset_live_chunk_status = reset_live_chunk_status
def start_process(self, state=None, process_name="", user_refs=None, source_path="", process_strength=None, output_path="", prompt_text="", continue_enabled=True, source_audio_track="", output_resolution="720p", target_ratio="", chunk_size_seconds=10.0, sliding_window_overlap=1, start_seconds="", end_seconds=""):
if self.active_job.get("running"):
yield self.info_exit("A process is already running.")
return
request = RunRequest.from_gradio(state, process_name, user_refs, source_path, process_strength, output_path, prompt_text, continue_enabled, source_audio_track, output_resolution, target_ratio, chunk_size_seconds, sliding_window_overlap, start_seconds, end_seconds)
process_definition = self.library.process_definition(request.process_name, request.state, request.user_refs)
if process_definition is None:
yield self.info_exit(f"Unsupported process: {request.process_name}")
return
system_handler = self.library.system_handler_for_definition(process_definition)
if process_definition.get("source") == "user":
problems = self.library.validate_user_process_definition(process_definition)
if len(problems) > 0:
yield self.info_exit(self.library.format_user_process_validation_error(process_definition, problems))
return
process_settings = process_definition["settings"]
model_type = str(process_settings.get("model_type") or "")
if len(model_type) == 0 and system_handler is None:
yield self.info_exit(f"Unsupported process: {request.process_name}")
return
process_display_name = str(process_definition.get("name") or request.process_name or "").strip()
process_is_hdr = False if system_handler is not None else VIDEO_PROMPT_HDR_OUTPUT_FLAG in str(process_settings.get("video_prompt_type") or "")
is_user_process = process_definition.get("source") == "user"
has_outpaint_setting = "video_guide_outpainting" in process_settings
uses_builtin_outpaint_ui = self.library.uses_builtin_outpaint_ui(process_definition)
user_lora_strength_override_default = self.library.user_lora_strength_override_default(process_definition)
use_lora_strength_override = system_handler is None and not uses_builtin_outpaint_ui and (not is_user_process or user_lora_strength_override_default is not None)
process_strength_default = user_lora_strength_override_default if user_lora_strength_override_default is not None else common.get_default_process_strength(process_settings)
active_process_strength = 1.0 if uses_builtin_outpaint_ui else (common.coerce_float(request.process_strength, process_strength_default) if use_lora_strength_override else process_strength_default)
source_path = request.source_path
output_path = request.output_path
source_audio_track = request.source_audio_track
output_resolution = request.output_resolution
target_ratio = request.target_ratio
prompt_text = request.prompt_text
start_seconds = request.start_seconds
end_seconds = request.end_seconds
if system_handler is not None and callable(getattr(system_handler, "normalize_target_control_for_process", None)):
system_target_control = system_handler.normalize_target_control_for_process(request.target_ratio, process_settings)
else:
system_target_control = system_handler.normalize_target_control(request.target_ratio) if system_handler is not None and hasattr(system_handler, "normalize_target_control") else ""
system_supports_continue_cache = system_handler is not None and (not callable(getattr(system_handler, "supports_continue_cache_for_target", None)) or system_handler.supports_continue_cache_for_target(system_target_control))
try:
chunk_size_seconds = common.require_float(request.chunk_size_seconds, "Chunk Size", minimum=0.1)
sliding_window_overlap = int(getattr(system_handler, "overlap_frames", 0)) if system_handler is not None else common.require_int(request.sliding_window_overlap, "Sliding Window Overlap", minimum=1)
except gr.Error as exc:
yield self.info_exit(common.get_error_message(exc) or "Invalid processing settings.")
return
try:
catalog.save_process_full_video_ui_settings({
"process_model_type": model_type,
"process_name": request.process_name,
"source_path": source_path,
"process_strength": active_process_strength,
"output_path": output_path,
"prompt": prompt_text,
"continue_enabled": request.continue_enabled,
"source_audio_track": source_audio_track,
"output_resolution": output_resolution,
"target_ratio": system_target_control if system_handler is not None else target_ratio,
"chunk_size_seconds": chunk_size_seconds,
"sliding_window_overlap": sliding_window_overlap,
"start_seconds": start_seconds,
"end_seconds": end_seconds,
})
except OSError as exc:
yield self.info_exit(f"Unable to save plugin settings to {catalog.PROCESS_FULL_VIDEO_SETTINGS_FILE}: {exc}")
return
active_target_ratio = target_ratio if uses_builtin_outpaint_ui else ""
default_prompt_text = str(process_settings.get("prompt") or "")
if len(prompt_text.strip()) == 0:
prompt_text = default_prompt_text
if not os.path.isfile(source_path):
yield self.info_exit(f"Source video not found: {source_path}")
return
try:
start_seconds = prompts.parse_time_input(start_seconds, label="Start", allow_empty=False)
end_seconds = prompts.parse_time_input(end_seconds, label="End", allow_empty=True)
except gr.Error as exc:
yield self.info_exit(common.get_error_message(exc) or "Invalid start/end selection.")
return
try:
prompt_schedule = prompts.parse_prompt_schedule(prompt_text)
except gr.Error as exc:
yield self.info_exit(common.get_error_message(exc) or f"Invalid prompt syntax.\n\nExample:\n{prompts.TIMED_PROMPT_EXAMPLE}")
return
started_ui = False
preflight_stage = True
write_state = MuxSession()
total_chunks_display = 1
completed_chunks = 0
resumed_unique_frames = 0
self.active_job["cancel_requested"] = False
self.active_job["write_state"] = write_state
self.active_job["running"] = True
try:
video.clear_process_full_video_source()
yield self.ui_update(status_ui.render_chunk_status_html(1, 0, 1, "Initializing", "Preparing processing job..."), self.ui_skip, str(time.time_ns()), start_enabled=False, abort_enabled=False)
started_ui = True
try:
prepared_run = prepare_run(
plugin=self.plugin,
get_model_def=self.get_model_def,
process_name=request.process_name,
process_display_name=process_display_name,
source_path=source_path,
output_path=output_path,
output_resolution=output_resolution,
active_target_ratio=active_target_ratio,
continue_enabled=request.continue_enabled,
source_audio_track=source_audio_track,
chunk_size_seconds=chunk_size_seconds,
sliding_window_overlap=sliding_window_overlap,
start_seconds=start_seconds,
end_seconds=end_seconds,
model_type=model_type,
process_is_hdr=process_is_hdr,
uses_builtin_outpaint_ui=uses_builtin_outpaint_ui,
system_handler=system_handler,
system_target_control=system_target_control,
)
verbose_level = prepared_run.verbose_level
start_frame = prepared_run.start_frame
end_frame_exclusive = prepared_run.end_frame_exclusive
fps_float = prepared_run.fps_float
total_source_frames = prepared_run.total_source_frames
processing_fps = prepared_run.processing_fps
selected_audio_track = prepared_run.selected_audio_track
frame_plan_rules = prepared_run.frame_plan_rules
budget_resolution = prepared_run.budget_resolution
chunk_frames = prepared_run.chunk_frames
overlap_frames = prepared_run.overlap_frames
full_plans = prepared_run.full_plans
requested_unique_frames = prepared_run.requested_unique_frames
requested_source_segment = prepared_run.requested_source_segment
output_path = prepared_run.output_path
resume_existing_output = prepared_run.resume_existing_output
ffmpeg_path = prepared_run.ffmpeg_path
ffprobe_path = prepared_run.ffprobe_path
output_container = prepared_run.output_container
merged_continuation_signatures = prepared_run.merged_continuation_signatures
except ProcessInfoExit as exc:
yield self.info_exit(exc.message, output=exc.output_path or self.ui_skip)
return
except gr.Error as exc:
yield self.info_exit(common.get_error_message(exc) or "Invalid process settings.", output=output_path if isinstance(output_path, str) and os.path.isfile(output_path) else self.ui_skip)
return
preflight_stage = False
if resume_existing_output:
recovery_result = yield from continuation_recovery.recover_residual_continuations(
plugin=self.plugin,
ffmpeg_path=ffmpeg_path,
ffprobe_path=ffprobe_path,
output_path=output_path,
full_plans=full_plans,
output_container=output_container,
fps_float=fps_float,
selected_audio_track=selected_audio_track,
source_path=source_path,
start_frame=start_frame,
merged_signatures=merged_continuation_signatures,
verbose_level=verbose_level,
ui_update=self.ui_update,
ui_skip=self.ui_skip,
system_handler=system_handler,
system_supports_continue_cache=system_supports_continue_cache,
)
merged_continuation_signatures = recovery_result.merged_signatures
if recovery_result.blocked:
return
if self.active_job.get("cancel_requested"):
write_state.stopped = True
yield self.ui_update(status_ui.render_chunk_status_html(len(full_plans) if len(full_plans) > 0 else 1, 0, 1, "Stopped", "Stopped before processing a new chunk."), output_path if os.path.isfile(output_path) else self.ui_skip, self.ui_skip, start_enabled=True, abort_enabled=False)
return
last_frame_image = None
last_segment_path = None
continuation_output_path = ""
chunk_output_paths: list[str] = []
written_unique_frames = 0
resumed_unique_frames = 0
resume_overlap_frames = 0
completed_chunks = 0
resolved_resolution = ""
resolved_width = 0
resolved_height = 0
merged_continuation = False
resume_audio_trim_seconds = 0.0
continue_cache = None
self.preview_state["image"] = None
write_state.set_output_path(output_path)
exact_start_seconds = start_frame / fps_float
if resume_existing_output:
yield self.ui_update(status_ui.render_chunk_status_html(len(full_plans), 0, 1, "Inspecting Existing Output", f"Inspecting existing output to continue: {output_path}"), output_path, str(time.time_ns()))
process_metadata.log_existing_output_metadata(output_path, verbose_level)
resumed_unique_frames, resume_reason = media.probe_resume_frame_count(ffprobe_path, output_path, fps_float)
recorded_written_unique_frames = process_metadata.read_recorded_written_unique_frames(output_path)
if 0 < recorded_written_unique_frames < resumed_unique_frames:
common.plugin_info(f"Output contains {resumed_unique_frames} readable frame(s), but metadata recorded {recorded_written_unique_frames}. Using the real frame count for continuation.")
process_metadata.store_process_progress(output_path, written_unique_frames=resumed_unique_frames, merged_signatures=merged_continuation_signatures, verbose_level=verbose_level)
elif recorded_written_unique_frames > resumed_unique_frames:
common.plugin_info(f"Ignoring recorded output progress of {recorded_written_unique_frames} frame(s) because the output only probes as {resumed_unique_frames} frame(s).")
if resumed_unique_frames < 0:
common.plugin_info(f"Ignoring negative probed output progress from {output_path}.")
resumed_unique_frames = 0
elif resumed_unique_frames > requested_unique_frames:
common.plugin_info(f"Existing output already covers the requested {requested_unique_frames} frame(s).")
resumed_unique_frames = requested_unique_frames
if resumed_unique_frames <= 0:
common.plugin_info(f"Unable to continue from existing output: {output_path}. {resume_reason or 'Starting a new file instead.'}")
output_path = output_paths.make_output_variant(Path(output_path), notify=common.plugin_info)
write_state.set_output_path(output_path)
resumed_unique_frames = 0
resume_overlap_frames = 0
completed_chunks = 0
exact_start_seconds = start_frame / fps_float
resume_existing_output = False
else:
common.plugin_info(f"Continuing existing output: {output_path}")
resolved_resolution, resolved_width, resolved_height = video.probe_existing_output_resolution(output_path)
print(f"[Process Full Video] Continuing with locked output resolution {resolved_resolution}")
completed_chunks, _ = (frames.count_completed_written_chunks if system_handler is not None else frames.count_completed_chunks)(full_plans, resumed_unique_frames)
exact_start_seconds = (start_frame + resumed_unique_frames) / fps_float
if resumed_unique_frames < requested_unique_frames:
resume_phase = "Planning Source Overlap" if system_handler is not None else "Loading Overlap Frames"
yield self.ui_update(status_ui.render_chunk_status_html(len(full_plans), 0, 1, resume_phase, f"Continuing existing output with {resumed_unique_frames} frame(s) already written."), output_path, str(time.time_ns()))
checked_unique_frames, last_frame_image, tail_reason = video.resolve_resume_last_frame(output_path, resumed_unique_frames)
if system_handler is not None:
if checked_unique_frames > 0:
resumed_unique_frames = checked_unique_frames
if tail_reason:
common.plugin_info(tail_reason)
if last_frame_image is not None:
self.preview_state["image"] = last_frame_image
if system_supports_continue_cache and hasattr(system_handler, "load_continue_cache"):
sidecar_exists = callable(getattr(system_handler, "cache_sidecar_path", None)) and Path(system_handler.cache_sidecar_path(output_path)).is_file()
if sidecar_exists or not callable(getattr(system_handler, "continue_cache_from_tail_frames", None)):
continue_cache = system_handler.load_continue_cache(output_path)
else:
fallback_frames = min(int(getattr(system_handler, "overlap_frames", 0)), int(resumed_unique_frames))
fallback_tail = video.load_process_full_video_overlap_buffer(output_path, fallback_frames, resumed_unique_frames)
continue_cache = system_handler.continue_cache_from_tail_frames(fallback_tail, system_target_control)
if continue_cache is None:
continue_cache = system_handler.load_continue_cache(output_path)
common.plugin_info("FlashVSR continuation sidecar is missing; continuing from decoded output tail frames. The first resumed chunk may have lower temporal continuity.")
completed_chunks, _ = frames.count_completed_written_chunks(full_plans, resumed_unique_frames)
exact_start_seconds = (start_frame + resumed_unique_frames) / fps_float
resume_overlap_frames = 0
if resumed_unique_frames < requested_unique_frames:
print(f"[Process Full Video] Continuing system process from source frame {start_frame + resumed_unique_frames}" + (" using continue cache" if system_supports_continue_cache else ""))
elif checked_unique_frames <= 0 or last_frame_image is None:
common.plugin_info(f"Unable to continue from existing output: {output_path}. {tail_reason or 'Starting a new file instead.'}")
output_path = output_paths.make_output_variant(Path(output_path), notify=common.plugin_info)
write_state.set_output_path(output_path)
resumed_unique_frames = 0
resume_overlap_frames = 0
completed_chunks = 0
self.preview_state["image"] = None
exact_start_seconds = start_frame / fps_float
resume_existing_output = False
else:
resumed_unique_frames = checked_unique_frames
if tail_reason:
common.plugin_info(tail_reason)
self.preview_state["image"] = last_frame_image
completed_chunks, _ = frames.count_completed_chunks(full_plans, resumed_unique_frames)
exact_start_seconds = (start_frame + resumed_unique_frames) / fps_float
resume_overlap_frames = overlap_frames
if resumed_unique_frames < resume_overlap_frames:
resume_overlap_frames = resumed_unique_frames
overlap_tensor = video.load_process_full_video_hdr_overlap_buffer(output_path, resume_overlap_frames, resumed_unique_frames) if process_is_hdr else video.load_process_full_video_overlap_buffer(output_path, resume_overlap_frames, resumed_unique_frames)
if overlap_tensor is None:
common.plugin_info(f"Unable to continue from existing output: {output_path}. Failed to load the overlap frames from the recorded output.")
output_path = output_paths.make_output_variant(Path(output_path), notify=common.plugin_info)
write_state.set_output_path(output_path)
resumed_unique_frames = 0
resume_overlap_frames = 0
completed_chunks = 0
self.preview_state["image"] = None
exact_start_seconds = start_frame / fps_float
resume_existing_output = False
else:
resume_overlap_frames = int(overlap_tensor.shape[1])
video.set_process_full_video_overlap_buffer(overlap_tensor, processing_fps, hdr=process_is_hdr)
print(f"[Process Full Video] Loaded overlap buffer from existing output: {frames.describe_frame_range(start_frame + resumed_unique_frames - resume_overlap_frames, resume_overlap_frames)}")
if resume_existing_output and resumed_unique_frames < requested_unique_frames:
if not continuation_recovery.USE_SOURCE_AUDIO_FOR_CONTINUATION_MERGE:
resume_audio_trim_seconds = media.probe_selected_audio_overhang(ffprobe_path, output_path, selected_audio_track, resumed_unique_frames / fps_float)
if continuation_recovery.USE_SOURCE_AUDIO_FOR_CONTINUATION_MERGE and selected_audio_track is not None:
print(f"[Process Full Video] Final merge: rebuilding audio from source starting at {start_frame / fps_float:.6f}s")
elif resume_audio_trim_seconds > 0.0:
print(f"[Process Full Video] Final merge: trimming {resume_audio_trim_seconds:.6f}s from continuation audio and clamping segment audio to visible video duration")
remaining_resume_unique_frames = frames.align_total_unique_frames(
end_frame_exclusive - (start_frame + resumed_unique_frames),
frame_step=frame_plan_rules.frame_step,
minimum_requested_frames=frame_plan_rules.minimum_requested_frames,
initial_overlap_frames=resume_overlap_frames,
)
if remaining_resume_unique_frames <= 0:
trailing_frames = requested_unique_frames - resumed_unique_frames
common.plugin_info(f"Existing output has {resumed_unique_frames} frame(s). The remaining {trailing_frames} frame(s) are too short to build another continuation chunk for the current model, so the existing output is treated as complete.")
resumed_unique_frames = requested_unique_frames
completed_chunks = len(full_plans)
plans = []
else:
continuation_output_path = output_paths.make_continuation_output_path(output_path)
write_state.set_output_path(continuation_output_path)
try:
plans = frames.build_chunk_plan(
start_frame + resumed_unique_frames,
end_frame_exclusive,
total_source_frames,
chunk_frames,
frame_step=frame_plan_rules.frame_step,
minimum_requested_frames=frame_plan_rules.minimum_requested_frames,
overlap_frames=overlap_frames,
initial_overlap_frames=resume_overlap_frames,
)
except frames.FramePlanningError as exc:
raise gr.Error(str(exc)) from exc
elif resume_existing_output:
plans = []
if not resume_existing_output:
plans = full_plans
continued_mode = resumed_unique_frames > 0
use_live_av_mux = selected_audio_track is not None
total_chunks_display = completed_chunks + len(plans)
current_chunk_display = completed_chunks + 1
run_started_at = time.time()
initial_completed_chunks = completed_chunks
def _timing_kwargs(current_completed_chunks=None, phase_current_step=None, phase_total_steps=None):
elapsed_seconds = time.time() - run_started_at
if elapsed_seconds < 0.0:
elapsed_seconds = 0.0
if len(plans) == 0:
return {"elapsed_seconds": elapsed_seconds, "eta_seconds": None}
completed_for_eta = completed_chunks if current_completed_chunks is None else current_completed_chunks
run_completed_chunks = completed_for_eta - initial_completed_chunks
phase_ratio = 0.0
if phase_current_step is not None and phase_total_steps is not None and phase_total_steps > 0:
phase_ratio = float(phase_current_step) / float(phase_total_steps)
overall_ratio = (run_completed_chunks + phase_ratio) / float(len(plans))
eta_seconds = None if overall_ratio <= 0.0 or overall_ratio >= 1.0 else elapsed_seconds * (1.0 - overall_ratio) / overall_ratio
return {"elapsed_seconds": elapsed_seconds, "eta_seconds": eta_seconds}
if len(plans) == 0:
if system_handler is not None and callable(getattr(system_handler, "delete_continue_cache", None)):
system_handler.delete_continue_cache(output_path)
yield self.ui_update(status_ui.render_chunk_status_html(total_chunks_display, completed_chunks, completed_chunks, "Completed", "Existing output already covers the requested range.", continued=continued_mode, **_timing_kwargs()), output_path, str(time.time_ns()), start_enabled=True, abort_enabled=False)
return
planning_text = f"Resuming from {resumed_unique_frames} frame(s) already written." if resumed_unique_frames > 0 else f"Preparing {len(plans)} chunk(s)..."
yield self.ui_update(status_ui.render_chunk_status_html(total_chunks_display, completed_chunks, current_chunk_display, "Planning", planning_text, continued=continued_mode, **_timing_kwargs()), output_path, str(time.time_ns()))
chunk_progress = ChunkProgress(
completed_chunks=completed_chunks,
current_chunk_display=current_chunk_display,
chunk_output_paths=chunk_output_paths,
last_segment_path=last_segment_path,
write_state=write_state,
resolved_resolution=resolved_resolution,
resolved_width=resolved_width,
resolved_height=resolved_height,
continue_cache=continue_cache,
)
continue_cache = None
chunk_result = yield from ChunkExecutor(
plugin=self.plugin,
api_session=self.api_session,
active_job=self.active_job,
preview_state=self.preview_state,
ui_update=self.ui_update,
ui_skip=self.ui_skip,
reset_live_chunk_status=self.reset_live_chunk_status,
).run(ProcessContext(
state=request.state,
process_settings=process_definition["settings"],
model_type=model_type,
process_is_hdr=process_is_hdr,
is_user_process=is_user_process,
has_outpaint_setting=has_outpaint_setting,
uses_builtin_outpaint_ui=uses_builtin_outpaint_ui,
use_lora_strength_override=use_lora_strength_override,
active_process_strength=active_process_strength,
active_target_ratio=active_target_ratio,
source_path=source_path,
output_path=output_path,
selected_audio_track=selected_audio_track,
prompt_schedule=prompt_schedule,
default_prompt_text=default_prompt_text,
budget_resolution=budget_resolution,
start_frame=start_frame,
resumed_unique_frames=resumed_unique_frames,
requested_unique_frames=requested_unique_frames,
overlap_frames=overlap_frames,
processing_fps=processing_fps,
fps_float=fps_float,
continued_mode=continued_mode,
plans=plans,
total_chunks_display=total_chunks_display,
ffmpeg_path=ffmpeg_path,
use_live_av_mux=use_live_av_mux,
output_container=output_container,
exact_start_seconds=exact_start_seconds,
timing_kwargs=_timing_kwargs,
system_handler=system_handler,
system_target_control=system_target_control,
), chunk_progress)
written_unique_frames = chunk_result.written_unique_frames
completed_chunks = chunk_result.completed_chunks
current_chunk_display = chunk_result.current_chunk_display
resolved_resolution = chunk_result.resolved_resolution
resolved_width = chunk_result.resolved_width
resolved_height = chunk_result.resolved_height
last_segment_path = chunk_result.last_segment_path
chunk_output_paths = chunk_result.chunk_output_paths
continue_cache = chunk_result.continue_cache
if write_state.mux_process is None:
if write_state.stopped and resumed_unique_frames > 0:
common.plugin_info(f"Processing was stopped before writing a new chunk. Kept existing output at {output_path}")
yield self.ui_update(status_ui.render_chunk_status_html(total_chunks_display, completed_chunks, current_chunk_display, "Stopped", "Stopped before a new chunk was written. Existing output kept.", continued=continued_mode, **_timing_kwargs()), output_path, self.ui_skip, start_enabled=True, abort_enabled=False)
return
if write_state.stopped:
common.plugin_info("Processing was stopped before any output chunk was written.")
yield self.ui_update(status_ui.render_chunk_status_html(total_chunks_display, completed_chunks, current_chunk_display, "Stopped", "Stopped before any output chunk was written.", continued=continued_mode, **_timing_kwargs()), self.ui_skip, self.ui_skip, start_enabled=True, abort_enabled=False)
return
raise gr.Error("Processing completed without creating an output file.")
if self.active_job.get("cancel_requested"):
write_state.stopped = True
finalizing_message = "Finalizing written output before merge..." if continuation_output_path and os.path.isfile(write_state.output_path_for_write) else "Finalizing written output..."
yield self.ui_update(status_ui.render_chunk_status_html(total_chunks_display, completed_chunks, current_chunk_display, "Finalizing Output", finalizing_message, continued=continued_mode, **_timing_kwargs()), output_path if os.path.isfile(output_path) else self.ui_skip, str(time.time_ns()), start_enabled=False, abort_enabled=False)
return_code, stderr, forced_termination = write_state.finalize()
if self.active_job.get("cancel_requested"):
write_state.stopped = True
if forced_termination:
raise gr.Error("ffmpeg did not finalize the partial output in time.")
if return_code != 0 and not (write_state.stopped and os.path.isfile(write_state.output_path_for_write if use_live_av_mux else write_state.video_only_output_path)):
raise gr.Error(stderr or "ffmpeg failed while assembling the processed video.")
if use_live_av_mux and os.path.isfile(write_state.output_path_for_write) and os.path.getsize(write_state.output_path_for_write) <= 0:
media.delete_file_if_exists(write_state.output_path_for_write, label="continuation output")
raise gr.Error("ffmpeg created an empty continuation file.")
if not use_live_av_mux and os.path.isfile(write_state.video_only_output_path):
try:
os.replace(write_state.video_only_output_path, write_state.output_path_for_write)
except OSError as exc:
raise gr.Error(f"Unable to finalize the written video-only segment: {write_state.output_path_for_write}") from exc
undeleted_merged_continuation_paths: list[str] = []
if continuation_output_path and os.path.isfile(write_state.output_path_for_write):
continuation_signature = process_metadata.make_continuation_signature(write_state.output_path_for_write)
try:
existing_output_generation_time = process_metadata.read_metadata_generation_time(output_path)
merged_duration_seconds = float(resumed_unique_frames + written_unique_frames) / fps_float
committed_signatures = process_metadata.append_merged_continuation_signature(merged_continuation_signatures, continuation_signature)
yield self.ui_update(status_ui.render_chunk_status_html(total_chunks_display, completed_chunks, current_chunk_display, "Merging Continuation", "Merging the continued segment into the main output...", continued=continued_mode, **_timing_kwargs()), output_path, str(time.time_ns()), start_enabled=False, abort_enabled=False)
media.concat_video_segments(
ffmpeg_path,
[output_path, write_state.output_path_for_write],
output_path,
self.plugin.server_config.get("video_output_codec", "libx264_8"),
output_container,
self.plugin.server_config.get("audio_output_codec", "aac_128"),
segment_audio_trim_seconds=[0.0, resume_audio_trim_seconds],
segment_audio_duration_seconds=[(float(resumed_unique_frames) / fps_float) if resumed_unique_frames > 0 else None, (float(written_unique_frames) / fps_float) if written_unique_frames > 0 else None],
fps_float=fps_float,
selected_audio_track_no=selected_audio_track,
reserved_metadata_path=write_state.reserved_metadata_path,
source_audio_path=source_path if continuation_recovery.USE_SOURCE_AUDIO_FOR_CONTINUATION_MERGE and selected_audio_track is not None else None,
source_audio_start_seconds=(start_frame / fps_float) if continuation_recovery.USE_SOURCE_AUDIO_FOR_CONTINUATION_MERGE and selected_audio_track is not None else None,
source_audio_duration_seconds=merged_duration_seconds if continuation_recovery.USE_SOURCE_AUDIO_FOR_CONTINUATION_MERGE and selected_audio_track is not None else None,
source_audio_track_no=selected_audio_track if continuation_recovery.USE_SOURCE_AUDIO_FOR_CONTINUATION_MERGE and selected_audio_track is not None else None,
)
merged_continuation = True
merged_continuation_signatures = committed_signatures
process_metadata.store_process_progress(output_path, written_unique_frames=resumed_unique_frames + written_unique_frames, merged_signatures=merged_continuation_signatures, verbose_level=verbose_level)
except media.ContinuationMergeOutputLockedError:
locked_message = f"{Path(output_path).name} is open, so the continuation merge could not replace it. Existing output was kept and {Path(write_state.output_path_for_write).name} was preserved. Release the base file and start a process again."
gr.Info(locked_message)
media.delete_file_if_exists(write_state.reserved_metadata_path, label="reserved metadata file")
yield self.ui_update(status_ui.render_chunk_status_html(total_chunks_display, completed_chunks, current_chunk_display, "Merge Pending", locked_message, continued=continued_mode, **_timing_kwargs()), write_state.output_path_for_write, str(time.time_ns()), start_enabled=True, abort_enabled=False)
return
except Exception as exc:
raise gr.Error(f"Failed to finalize continued output. Existing output kept, and continuation was preserved at {continuation_output_path}. {exc}") from exc
if os.path.isfile(write_state.output_path_for_write):
try:
os.remove(write_state.output_path_for_write)
if system_handler is not None and callable(getattr(system_handler, "delete_continue_cache", None)):
system_handler.delete_continue_cache(write_state.output_path_for_write)
except OSError:
undeleted_merged_continuation_paths.append(write_state.output_path_for_write)
common.plugin_info(f"Merged continuation progress into {Path(output_path).name}, but {Path(write_state.output_path_for_write).name} could not be deleted because it is still open. Delete it manually when released.")
else:
existing_output_generation_time = 0.0
media.delete_file_if_exists(write_state.reserved_metadata_path, label="reserved metadata file")
total_written_unique_frames = resumed_unique_frames + written_unique_frames
if not write_state.stopped and total_written_unique_frames < requested_unique_frames:
raise gr.Error(f"Processing wrote {total_written_unique_frames} frame(s), but {requested_unique_frames} frame(s) were required.")
metadata_target_path = output_path if merged_continuation or not continuation_output_path else write_state.output_path_for_write
yield self.ui_update(status_ui.render_chunk_status_html(total_chunks_display, completed_chunks, current_chunk_display, "Writing Metadata", "Writing final output metadata...", continued=continued_mode, **_timing_kwargs()), metadata_target_path if os.path.isfile(metadata_target_path) else output_path, str(time.time_ns()), start_enabled=False, abort_enabled=False)
metadata_source_path = last_segment_path or media.get_last_generated_video_path(chunk_output_paths) or metadata_target_path
actual_output_frames = media.probe_resume_frame_count(ffprobe_path, metadata_target_path, fps_float)[0] if os.path.isfile(metadata_target_path) else 0
if actual_output_frames <= 0:
if metadata_target_path != output_path or not resume_existing_output:
media.delete_file_if_exists(metadata_target_path, label="invalid output")
raise gr.Error("Final output does not contain a readable video frame.")
if actual_output_frames < total_written_unique_frames:
if write_state.stopped:
common.plugin_info(f"Stopped output contains {actual_output_frames} readable frame(s), lower than the {total_written_unique_frames} frame(s) attempted. Recording the probed frame count.")
total_written_unique_frames = actual_output_frames
else:
raise gr.Error(f"Final output contains {actual_output_frames} readable frame(s), but {total_written_unique_frames} frame(s) were written.")
total_generation_time = existing_output_generation_time + process_metadata.read_metadata_generation_time(metadata_source_path) if merged_continuation else process_metadata.read_metadata_generation_time(metadata_source_path)
output_process_metadata = {
"process": process_display_name,
"written_unique_frames": int(total_written_unique_frames),
"chunks": int(total_chunks_display),
"sliding_window_overlap": int(overlap_frames),
"start_seconds": float(start_seconds),
"end_seconds": float(start_seconds + (total_written_unique_frames / float(fps_float))),
"source_video": source_path,
"source_segment": requested_source_segment,
"merged_continuations": process_metadata.normalize_merged_continuation_signatures(merged_continuation_signatures),
}
if process_is_hdr:
output_process_metadata["hdr"] = True
metadata_written = process_metadata.store_output_metadata(metadata_target_path, metadata_source_path, source_path=source_path, process_name=process_display_name, source_start_seconds=start_seconds, start_frame=start_frame, fps_float=fps_float, selected_audio_track=selected_audio_track, total_generation_time=total_generation_time, actual_frame_count=actual_output_frames, process_metadata=output_process_metadata, verbose_level=verbose_level)
completed_output = not write_state.stopped and (total_written_unique_frames >= requested_unique_frames or completed_chunks >= total_chunks_display)
if system_handler is not None and completed_output and callable(getattr(system_handler, "delete_continue_cache", None)):
for cache_output_path in dict.fromkeys([metadata_target_path, output_path, write_state.output_path_for_write]):
if cache_output_path:
system_handler.delete_continue_cache(cache_output_path)
continue_cache = None
elif system_handler is not None and continue_cache is not None and hasattr(system_handler, "save_continue_cache"):
system_handler.save_continue_cache(continue_cache, metadata_target_path, metadata=output_process_metadata)
if not metadata_written:
raise gr.Error(f"Failed to write WanGP metadata to {metadata_target_path}. The partial output was kept, but continuation may require the sidecar cache.")
chunk_output_paths = media.delete_released_chunk_outputs(request.state, chunk_output_paths)
if write_state.stopped:
stopped_output_path = output_path
if merged_continuation:
common.plugin_info(f"Processing was stopped. Merged continued progress into {output_path}")
stop_message = f"Stopped after {total_written_unique_frames} frame(s). Continued progress was merged into the output."
elif continuation_output_path and os.path.isfile(write_state.output_path_for_write):
stopped_output_path = write_state.output_path_for_write
common.plugin_info(f"Processing was stopped. Kept existing output at {output_path} and preserved continuation clip at {write_state.output_path_for_write}")
stop_message = f"Stopped after {total_written_unique_frames} frame(s). Existing output kept and continuation clip preserved."
else:
common.plugin_info(f"Processing was stopped. Kept partial output at {output_path}")
stop_message = f"Stopped after {total_written_unique_frames} frame(s). Partial output kept."
yield self.ui_update(status_ui.render_chunk_status_html(total_chunks_display, completed_chunks, current_chunk_display, "Stopped", stop_message, continued=continued_mode, **_timing_kwargs()), stopped_output_path, self.ui_skip, start_enabled=True, abort_enabled=False)
return
yield self.ui_update(status_ui.render_chunk_status_html(total_chunks_display, total_chunks_display, total_chunks_display, "Completed", f"Completed {total_chunks_display} chunk(s).", continued=continued_mode, **_timing_kwargs()), output_path, self.ui_skip, start_enabled=True, abort_enabled=False)
self.active_job["job"] = None
write_state.cleanup_partial_outputs()
except gr.Error as exc:
self.active_job["job"] = None
write_state.cleanup_partial_outputs()
status_message = common.get_error_message(exc) or "Processing failed."
if not started_ui:
gr.Info(status_message)
return
if started_ui:
total_chunks_value = total_chunks_display
completed_value = completed_chunks
current_value = completed_chunks + 1 if completed_chunks < total_chunks_display else total_chunks_display
continued_value = resumed_unique_frames > 0
output_value = output_path if isinstance(output_path, str) and os.path.isfile(output_path) else self.ui_skip
if preflight_stage:
gr.Info(status_message)
yield self.ui_update(status_ui.render_chunk_status_html(total_chunks_value, completed_value, current_value, "Info" if preflight_stage else "Error", status_message, continued=continued_value), output_value, self.ui_skip, start_enabled=True, abort_enabled=False)
return
except BaseException as exc:
self.active_job["job"] = None
write_state.cleanup_partial_outputs()
if started_ui:
total_chunks_value = total_chunks_display
completed_value = completed_chunks
current_value = completed_chunks + 1 if completed_chunks < total_chunks_display else total_chunks_display
continued_value = resumed_unique_frames > 0
status_message = common.get_error_message(exc) or exc.__class__.__name__
output_value = output_path if isinstance(output_path, str) and os.path.isfile(output_path) else self.ui_skip
yield self.ui_update(status_ui.render_chunk_status_html(total_chunks_value, completed_value, current_value, "Error", status_message, continued=continued_value), output_value, self.ui_skip, start_enabled=True, abort_enabled=False)
raise
finally:
self.active_job["running"] = False
self.active_job["write_state"] = None
video.clear_process_full_video_source()