| from __future__ import annotations |
|
|
| import html |
| from pathlib import Path |
|
|
| import gradio as gr |
|
|
| from shared.gradio.local_file_picker import LocalFilePickerTextbox, VIDEO_FILE_EXTENSIONS |
|
|
| from . import common |
| from . import constants as ui_constants |
| from . import process_catalog as catalog |
| from . import prompt_schedule as prompts |
| from . import status_ui |
| from .form_controller import FormComponentValues, ProcessFormController |
| from .process_library import ProcessLibrary |
| from .process_runner import ProcessRunner |
|
|
|
|
| def create_config_ui(self, api_session): |
| if catalog.PROCESS_DEFINITIONS_ERROR is not None: |
| with gr.Blocks() as plugin_blocks: |
| gr.Markdown(f"Process settings configuration error: {html.escape(catalog.PROCESS_DEFINITIONS_ERROR)}") |
| return plugin_blocks |
| get_model_def = self.get_model_def |
| get_lora_dir = self.get_lora_dir |
| get_base_model_type = self.get_base_model_type |
| library = ProcessLibrary(get_model_def=get_model_def, get_lora_dir=get_lora_dir, get_base_model_type=get_base_model_type) |
| output_resolution_choices = [("1080p", "1080p"), ("900p", "900p"), ("720p", "720p"), ("540p", "540p"), ("480p", "480p"), ("384p", "384p"), ("320p", "320p"), ("256p", "256p")] |
| output_resolution_values = {value for _, value in output_resolution_choices} |
| source_audio_track_choices = [("Auto", "")] + [(f"Audio Track {track_no}", str(track_no)) for track_no in range(1, 10)] |
| source_audio_track_values = {value for _, value in source_audio_track_choices} |
| ratio_values = {value for _, value in ui_constants.RATIO_CHOICES} |
|
|
| form_controller = ProcessFormController(library=library, get_model_def=get_model_def, output_resolution_values=output_resolution_values, source_audio_track_values=source_audio_track_values, ratio_values=ratio_values) |
| saved_ui_settings = catalog.load_saved_process_full_video_settings() |
| initial_user_refs = catalog.get_saved_user_settings_refs(saved_ui_settings) |
| initial_form = form_controller.build_initial_form(saved_ui_settings, self.state.value, initial_user_refs) |
| default_model_type = initial_form.model_type |
| default_process_choices = initial_form.process_choices |
| default_process_name = initial_form.process_name |
| default_state = initial_form.form_state |
| active_job = {"job": None, "running": False, "cancel_requested": False, "write_state": None} |
| preview_state = {"image": None} |
| ui_skip = object() |
|
|
| def refresh_preview(_refresh_id): |
| return preview_state["image"] |
|
|
| def _button_update(label: str, enabled: bool | None): |
| return gr.skip() if enabled is None else gr.update(value=label, interactive=enabled) |
|
|
| def _ui_update(status=ui_skip, output=ui_skip, preview_refresh=ui_skip, *, start_enabled: bool | None = None, abort_enabled: bool | None = None): |
| status_update = gr.skip() if status is ui_skip else status |
| output_update = gr.skip() if output is ui_skip else status_ui.render_output_file_html(output) |
| preview_update = gr.skip() if preview_refresh is ui_skip else preview_refresh |
| start_update = _button_update("Start Process", start_enabled) |
| abort_update = _button_update("Stop", abort_enabled) |
| return status_update, output_update, preview_update, start_update, abort_update |
|
|
| def _info_exit(message: str, *, output=ui_skip, total_chunks: int = 1, completed_chunks: int = 0, current_chunk: int = 1, continued: bool = False): |
| gr.Info(str(message or "").strip()) |
| return _ui_update(status_ui.render_chunk_status_html(total_chunks, completed_chunks, current_chunk, "Info", str(message or "").strip(), continued=continued), output, ui_skip, start_enabled=True, abort_enabled=False) |
|
|
| def _reset_live_chunk_status(state: dict) -> None: |
| gen = state.get("gen") if isinstance(state, dict) else None |
| if not isinstance(gen, dict): |
| return |
| gen["status"] = "" |
| gen["status_display"] = False |
| gen["progress_args"] = None |
| gen["progress_phase"] = None |
| gen["progress_status"] = "" |
| gen["preview"] = None |
|
|
| def _add_user_process_link(process_value: str, main_state: dict | None, main_lset_name: str | None, user_refs: list[str] | None): |
| if str(process_value or "").strip() == ui_constants.NO_USER_SETTINGS_VALUE: |
| raise gr.Error("No user settings are available to add.") |
| process_definition = library.process_definition(process_value, main_state, user_refs) |
| if process_definition is None: |
| raise gr.Error("The selected user settings file could not be found.") |
| problems = library.validate_user_process_definition(process_definition) |
| if len(problems) > 0: |
| gr.Info(library.format_user_process_validation_error(process_definition, problems)) |
| return (gr.skip(),) * 9 |
| ref = catalog.normalize_user_settings_ref(process_definition.get("ref")) |
| if len(ref) == 0: |
| raise gr.Error("The selected user settings file could not be linked.") |
| refs = catalog.get_saved_user_settings_refs({catalog.USER_SETTINGS_STORAGE_KEY: user_refs}) |
| model_label = library.model_type_label(library.process_definition_model_type(process_definition)) |
| if ref.casefold() not in {item.casefold() for item in refs}: |
| refs.append(ref) |
| catalog.store_user_settings_refs(refs) |
| gr.Info(f'User settings "{process_definition.get("name")}" have been added for {model_label}.') |
| else: |
| gr.Info(f'User settings "{process_definition.get("name")}" are already linked for {model_label}.') |
| process_choices, selected = library.current_user_settings_choices(main_state, main_lset_name) |
| if str(process_value or "").strip() in {value for _label, value in process_choices}: |
| selected = str(process_value or "").strip() |
| return ( |
| refs, |
| gr.update(choices=library.model_type_choices(refs), value=ui_constants.ADD_USER_SETTINGS_MODEL_TYPE), |
| gr.update(choices=process_choices, value=selected), |
| form_controller.user_settings_hint_update(process_choices), |
| selected, |
| *form_controller.settings_action_updates(ui_constants.ADD_USER_SETTINGS_MODEL_TYPE, selected), |
| ) |
|
|
| def _delete_user_process_link(memory_state: dict | None, process_value: str, main_state: dict | None, user_refs: list[str] | None, source_path: str): |
| process_value = str(process_value or "").strip() |
| ref = catalog.user_process_ref_from_value(process_value) |
| if len(ref) == 0: |
| raise gr.Error("Choose a linked user settings process to remove.") |
| old_refs = catalog.get_saved_user_settings_refs({catalog.USER_SETTINGS_STORAGE_KEY: user_refs}) |
| deleted_definition = library.build_user_process_definition(ref) |
| deleted_model_type = library.process_definition_model_type(deleted_definition) |
| new_refs = [item for item in old_refs if item.casefold() != ref.casefold()] |
| catalog.store_user_settings_refs(new_refs) |
| model_type, next_process_name, process_choices = library.select_after_user_process_delete(process_value, deleted_model_type, old_refs, new_refs) |
| restored = form_controller.restore_state(memory_state, next_process_name, source_path, main_state, new_refs) |
| gr.Info(f'Removed user settings "{Path(ref).stem}".') |
| action_updates = form_controller.settings_action_updates(model_type, next_process_name) |
| return ( |
| new_refs, |
| gr.update(choices=library.model_type_choices(new_refs), value=model_type), |
| gr.update(choices=process_choices, value=next_process_name), |
| form_controller.user_settings_hint_update(process_choices), |
| next_process_name, |
| *action_updates, |
| *restored, |
| ) |
|
|
| model_type_choices = initial_form.model_type_choices |
| initial_process_form_memory = {default_process_name: default_state.to_dict()} |
|
|
| process_runner = ProcessRunner( |
| plugin=self, |
| api_session=api_session, |
| library=library, |
| get_model_def=get_model_def, |
| active_job=active_job, |
| preview_state=preview_state, |
| ui_skip=ui_skip, |
| ui_update=_ui_update, |
| info_exit=_info_exit, |
| reset_live_chunk_status=_reset_live_chunk_status, |
| ) |
|
|
| def stop_process(): |
| active_job["cancel_requested"] = True |
| write_state = active_job.get("write_state") |
| if write_state is not None: |
| write_state.stopped = True |
| job = active_job.get("job") |
| if job is not None and not job.done: |
| try: |
| job.cancel() |
| except RuntimeError as exc: |
| print(f"[Process Full Video] Stop requested; WanGP abort bridge was not available: {exc}") |
| common.plugin_info("Stopping current processing job...") |
| return gr.update(value="Start Process", interactive=False), gr.update(value="Stop", interactive=False) |
| if active_job.get("running"): |
| return gr.update(value="Start Process", interactive=False), gr.update(value="Stop", interactive=False) |
| return gr.update(value="Start Process", interactive=True), gr.update(value="Stop", interactive=False) |
|
|
| def _form_values(source_path_value, process_strength_value, output_path_value, prompt_value, continue_value, source_audio_track_value, output_resolution_value, target_ratio_value, chunk_size_value, overlap_value, start_value, end_value): |
| return FormComponentValues(source_path_value, process_strength_value, output_path_value, prompt_value, continue_value, source_audio_track_value, output_resolution_value, target_ratio_value, chunk_size_value, overlap_value, start_value, end_value) |
|
|
| def _store_memory(memory_state, current_process_name, main_state, refs, source_path_value, process_strength_value, output_path_value, prompt_value, continue_value, source_audio_track_value, output_resolution_value, target_ratio_value, chunk_size_value, overlap_value, start_value, end_value): |
| values = _form_values(source_path_value, process_strength_value, output_path_value, prompt_value, continue_value, source_audio_track_value, output_resolution_value, target_ratio_value, chunk_size_value, overlap_value, start_value, end_value) |
| return form_controller.store_memory(memory_state, current_process_name, main_state, refs, values) |
|
|
| def _change_process_model_type(memory_state, current_process_name, next_model_type, main_state, main_lset_name, refs, source_path_value, process_strength_value, output_path_value, prompt_value, continue_value, source_audio_track_value, output_resolution_value, target_ratio_value, chunk_size_value, overlap_value, start_value, end_value): |
| values = _form_values(source_path_value, process_strength_value, output_path_value, prompt_value, continue_value, source_audio_track_value, output_resolution_value, target_ratio_value, chunk_size_value, overlap_value, start_value, end_value) |
| return form_controller.change_process_model_type(memory_state, current_process_name, next_model_type, main_state, main_lset_name, refs, values) |
|
|
| def _change_process_name(memory_state, current_process_name, next_process_name, process_model_type_value, main_state, refs, source_path_value, process_strength_value, output_path_value, prompt_value, continue_value, source_audio_track_value, output_resolution_value, target_ratio_value, chunk_size_value, overlap_value, start_value, end_value): |
| values = _form_values(source_path_value, process_strength_value, output_path_value, prompt_value, continue_value, source_audio_track_value, output_resolution_value, target_ratio_value, chunk_size_value, overlap_value, start_value, end_value) |
| return form_controller.change_process_name(memory_state, current_process_name, next_process_name, process_model_type_value, main_state, refs, values) |
|
|
| def _refresh_from_main(refresh_id, memory_state, current_process_name, process_model_type_value, main_state, main_lset_name, refs, source_path_value, process_strength_value, output_path_value, prompt_value, continue_value, source_audio_track_value, output_resolution_value, target_ratio_value, chunk_size_value, overlap_value, start_value, end_value): |
| values = _form_values(source_path_value, process_strength_value, output_path_value, prompt_value, continue_value, source_audio_track_value, output_resolution_value, target_ratio_value, chunk_size_value, overlap_value, start_value, end_value) |
| return form_controller.refresh_from_main(refresh_id, memory_state, current_process_name, process_model_type_value, main_state, main_lset_name, refs, values) |
|
|
| process_form_memory = gr.State(initial_process_form_memory) |
| active_process_name_state = gr.State(default_process_name) |
| user_process_refs = gr.State(initial_user_refs) |
| with gr.Column(): |
| gr.HTML( |
| """ |
| <style> |
| #process-full-video-settings-actions { |
| align-self: flex-end !important; |
| margin-bottom: 1px; |
| padding-bottom: 4px !important; |
| gap: 4px; |
| width: 34px !important; |
| min-width: 34px !important; |
| max-width: 34px !important; |
| } |
| #process-full-video-settings-actions > .form { |
| padding: 0 !important; |
| border: 0 !important; |
| background: transparent !important; |
| box-shadow: none !important; |
| } |
| #process-full-video-settings-actions button { |
| width: 34px !important; |
| min-width: 34px !important; |
| max-width: 34px !important; |
| height: 34px; |
| min-height: 34px; |
| padding: 0 !important; |
| } |
| #process-full-video-settings-actions .process-full-video-settings-action-placeholder { |
| display: none !important; |
| width: 0 !important; |
| min-width: 0 !important; |
| max-width: 0 !important; |
| height: 0 !important; |
| min-height: 0 !important; |
| overflow: hidden !important; |
| } |
| #process-full-video-user-settings-hint-row { |
| height: 12px !important; |
| min-height: 0 !important; |
| max-height: 12px !important; |
| margin-top: -10px !important; |
| margin-bottom: -4px !important; |
| padding: 0 !important; |
| overflow: visible !important; |
| } |
| #process-full-video-user-settings-hint-row > .form { |
| padding: 0 !important; |
| border: 0 !important; |
| background: transparent !important; |
| box-shadow: none !important; |
| min-height: 0 !important; |
| overflow: visible !important; |
| } |
| #process-full-video-user-settings-hint-row .block, |
| #process-full-video-user-settings-hint-row .html-container, |
| #process-full-video-user-settings-hint-row .prose { |
| height: auto !important; |
| margin: 0 !important; |
| min-height: 0 !important; |
| padding: 0 !important; |
| overflow: visible !important; |
| } |
| </style> |
| """ |
| ) |
| with gr.Row(): |
| gr.Markdown("This PlugIn is a *Super Sliding Windows* mode with *Low RAM requirements*, lossless Audio Copy and no risk to explode your Web Browser and the *Video Gallery* with huge files. You can stop a Process and Resume it later. You can define different prompts for different time range. However quite often the prompt should have little impact on the ouput.") |
| with gr.Row(): |
| process_model_type = gr.Dropdown(model_type_choices, value=default_model_type, label="Model", scale=1) |
| process_name = gr.Dropdown(default_process_choices, value=default_process_name, label="Process", scale=3) |
| with gr.Column(scale=0, min_width=34, visible=default_model_type == ui_constants.ADD_USER_SETTINGS_MODEL_TYPE or catalog.is_user_process_value(default_process_name), elem_id="process-full-video-settings-actions") as settings_actions_column: |
| add_user_settings_btn = gr.Button("\u2795", size="sm", min_width=1, visible=default_model_type == ui_constants.ADD_USER_SETTINGS_MODEL_TYPE, elem_classes=["wangp-assistant-chat__template-tool-icon-btn"]) |
| delete_user_settings_btn = gr.Button("\U0001F5D1\uFE0F", size="sm", min_width=1, visible=catalog.is_user_process_value(default_process_name), elem_classes=["wangp-assistant-chat__template-tool-icon-btn", "wangp-assistant-chat__template-tool-icon-btn--danger"]) |
| settings_actions_placeholder = gr.HTML("<div class='process-full-video-settings-action-placeholder'></div>", visible=False) |
| with gr.Row(visible=library.process_choices_have_user_settings(default_process_choices), elem_id="process-full-video-user-settings-hint-row") as process_user_settings_hint_row: |
| gr.HTML(value=ui_constants.USER_SETTINGS_HINT_HTML) |
| source_path = LocalFilePickerTextbox(label="Source Video Path File", value=default_state.source_path, file_extensions=VIDEO_FILE_EXTENSIONS, popup_title="Browse Local Source Video").mount() |
| with gr.Row(): |
| output_path = gr.Textbox(label="Output File Path File (None for auto, Full Name or Target Folder)", value=default_state.output_path, scale=3) |
| continue_enabled = gr.Checkbox(label="Continue", value=default_state.continue_enabled, elem_classes="cbx_bottom", scale=1) |
| with gr.Row(): |
| output_resolution = gr.Dropdown(output_resolution_choices, value=default_state.output_resolution, label="Output Resolution", visible=initial_form.output_resolution_visible) |
| default_process_strength = 1.0 if initial_form.target_ratio_visible else default_state.process_strength |
| process_strength = gr.Slider(label="Process Strength (LoRA Multiplier)", minimum=min(0.0, default_process_strength), maximum=max(3.0, default_process_strength), step=0.01, value=default_process_strength, visible=initial_form.process_strength_visible) |
| with gr.Row(): |
| chunk_size_seconds = gr.Number(label="Chunk Size (seconds)", value=default_state.chunk_size_seconds, precision=2) |
| target_ratio = gr.Dropdown(initial_form.target_ratio_choices if initial_form.target_ratio_visible else ui_constants.RATIO_CHOICES_WITH_EMPTY, value=default_state.target_ratio if initial_form.target_ratio_visible else "", label=initial_form.target_ratio_label, visible=initial_form.target_ratio_visible) |
| sliding_window_overlap = gr.Slider(label="Sliding Window Overlap", minimum=0 if not initial_form.overlap_visible else 1, maximum=initial_form.overlap_max, step=initial_form.overlap_step, value=default_state.sliding_window_overlap, visible=initial_form.overlap_visible) |
| with gr.Row(): |
| start_seconds = gr.Textbox(label="Start (s/MM:SS(.xx)/HH:MM:SS(.xx))", value=default_state.start_seconds, placeholder="seconds, MM:SS(.xx), or HH:MM:SS(.xx)") |
| end_seconds = gr.Textbox(label="End (s/MM:SS(.xx)/HH:MM:SS(.xx))", value=default_state.end_seconds, placeholder="seconds, MM:SS(.xx), or HH:MM:SS(.xx)") |
| source_audio_track = gr.Dropdown(source_audio_track_choices, value=default_state.source_audio_track, label="Source Audio Track") |
| with gr.Row(): |
| prompt_text = gr.Textbox( |
| label="Prompt (timed blocks supported: MM:SS(.xx) / HH:MM:SS(.xx))", |
| value=default_state.prompt, |
| lines=1, |
| placeholder=prompts.TIMED_PROMPT_EXAMPLE, |
| visible=initial_form.prompt_visible, |
| ) |
| with gr.Row(): |
| start_btn = gr.Button("Start Process") |
| abort_btn = gr.Button("Stop", interactive=False) |
| status_html = gr.HTML(value=status_ui.render_chunk_status_html(0, 0, 0, "Idle", "Waiting to start...")) |
| preview_image = gr.Image(label="Last Frame Preview", type="pil") |
| output_file = gr.HTML(value=status_ui.render_output_file_html("")) |
| preview_refresh = gr.Textbox(value="", visible=False) |
| tab_refresh_trigger = gr.Textbox(value="", visible=False) |
|
|
| self.on_tab_outputs = [tab_refresh_trigger] |
|
|
| gr.on( |
| [ |
| source_path.change, |
| process_strength.change, |
| output_path.change, |
| prompt_text.change, |
| continue_enabled.change, |
| source_audio_track.change, |
| output_resolution.change, |
| target_ratio.change, |
| chunk_size_seconds.change, |
| sliding_window_overlap.change, |
| start_seconds.change, |
| end_seconds.change, |
| ], |
| fn=_store_memory, |
| inputs=[process_form_memory, active_process_name_state, self.state, user_process_refs, source_path, process_strength, output_path, prompt_text, continue_enabled, source_audio_track, output_resolution, target_ratio, chunk_size_seconds, sliding_window_overlap, start_seconds, end_seconds], |
| outputs=[process_form_memory], |
| queue=False, |
| show_progress="hidden", |
| ) |
| process_model_type.change( |
| fn=_change_process_model_type, |
| inputs=[process_form_memory, active_process_name_state, process_model_type, self.state, self.lset_name, user_process_refs, source_path, process_strength, output_path, prompt_text, continue_enabled, source_audio_track, output_resolution, target_ratio, chunk_size_seconds, sliding_window_overlap, start_seconds, end_seconds], |
| outputs=[process_form_memory, active_process_name_state, process_name, process_user_settings_hint_row, settings_actions_column, add_user_settings_btn, delete_user_settings_btn, settings_actions_placeholder, source_path, process_strength, output_path, prompt_text, continue_enabled, source_audio_track, output_resolution, target_ratio, chunk_size_seconds, sliding_window_overlap, start_seconds, end_seconds], |
| queue=False, |
| show_progress="hidden", |
| ) |
| process_name.change( |
| fn=_change_process_name, |
| inputs=[process_form_memory, active_process_name_state, process_name, process_model_type, self.state, user_process_refs, source_path, process_strength, output_path, prompt_text, continue_enabled, source_audio_track, output_resolution, target_ratio, chunk_size_seconds, sliding_window_overlap, start_seconds, end_seconds], |
| outputs=[process_form_memory, active_process_name_state, settings_actions_column, delete_user_settings_btn, settings_actions_placeholder, source_path, process_strength, output_path, prompt_text, continue_enabled, source_audio_track, output_resolution, target_ratio, chunk_size_seconds, sliding_window_overlap, start_seconds, end_seconds], |
| queue=False, |
| show_progress="hidden", |
| ) |
| add_user_settings_btn.click( |
| fn=_add_user_process_link, |
| inputs=[process_name, self.state, self.lset_name, user_process_refs], |
| outputs=[user_process_refs, process_model_type, process_name, process_user_settings_hint_row, active_process_name_state, settings_actions_column, add_user_settings_btn, delete_user_settings_btn, settings_actions_placeholder], |
| show_progress="hidden", |
| ) |
| delete_user_settings_btn.click( |
| fn=_delete_user_process_link, |
| inputs=[process_form_memory, process_name, self.state, user_process_refs, source_path], |
| outputs=[user_process_refs, process_model_type, process_name, process_user_settings_hint_row, active_process_name_state, settings_actions_column, add_user_settings_btn, delete_user_settings_btn, settings_actions_placeholder, source_path, process_strength, output_path, prompt_text, continue_enabled, source_audio_track, output_resolution, target_ratio, chunk_size_seconds, sliding_window_overlap, start_seconds, end_seconds], |
| show_progress="hidden", |
| ) |
| self.refresh_form_trigger.change( |
| fn=_refresh_from_main, |
| inputs=[self.refresh_form_trigger, process_form_memory, active_process_name_state, process_model_type, self.state, self.lset_name, user_process_refs, source_path, process_strength, output_path, prompt_text, continue_enabled, source_audio_track, output_resolution, target_ratio, chunk_size_seconds, sliding_window_overlap, start_seconds, end_seconds], |
| outputs=[process_model_type, process_name, process_user_settings_hint_row, process_form_memory, active_process_name_state, settings_actions_column, add_user_settings_btn, delete_user_settings_btn, settings_actions_placeholder, source_path, process_strength, output_path, prompt_text, continue_enabled, source_audio_track, output_resolution, target_ratio, chunk_size_seconds, sliding_window_overlap, start_seconds, end_seconds], |
| queue=False, |
| show_progress="hidden", |
| ) |
| tab_refresh_trigger.change( |
| fn=_refresh_from_main, |
| inputs=[tab_refresh_trigger, process_form_memory, active_process_name_state, process_model_type, self.state, self.lset_name, user_process_refs, source_path, process_strength, output_path, prompt_text, continue_enabled, source_audio_track, output_resolution, target_ratio, chunk_size_seconds, sliding_window_overlap, start_seconds, end_seconds], |
| outputs=[process_model_type, process_name, process_user_settings_hint_row, process_form_memory, active_process_name_state, settings_actions_column, add_user_settings_btn, delete_user_settings_btn, settings_actions_placeholder, source_path, process_strength, output_path, prompt_text, continue_enabled, source_audio_track, output_resolution, target_ratio, chunk_size_seconds, sliding_window_overlap, start_seconds, end_seconds], |
| queue=False, |
| show_progress="hidden", |
| ) |
| start_btn.click( |
| fn=process_runner.start_process, |
| inputs=[self.state, process_name, user_process_refs, source_path, process_strength, output_path, prompt_text, continue_enabled, source_audio_track, output_resolution, target_ratio, chunk_size_seconds, sliding_window_overlap, start_seconds, end_seconds], |
| outputs=[status_html, output_file, preview_refresh, start_btn, abort_btn], |
| queue=False, |
| show_progress="hidden", |
| show_progress_on=[], |
| ) |
| preview_refresh.change(fn=refresh_preview, inputs=[preview_refresh], outputs=[preview_image], queue=False, show_progress="hidden") |
| abort_btn.click(fn=stop_process, outputs=[start_btn, abort_btn], queue=False, show_progress="hidden") |
|
|