from __future__ import annotations from collections.abc import Callable from pathlib import Path import gradio as gr from . import constants def choose_resolution(budget_label: str) -> str: resolutions = {"256p": "448x256", "320p": "576x320", "384p": "672x384", "480p": "832x480", "540p": "960x544", "720p": "1280x720", "900p": "1600x896", "1080p": "1920x1088"} try: return resolutions[str(budget_label)] except KeyError as exc: raise gr.Error(f"Unsupported Output Resolution: {budget_label}") from exc def format_time_token(seconds: float | None) -> str: if seconds in (None, ""): return "end" total_centiseconds = max(0, int(round(float(seconds) * 100.0))) total_seconds, centiseconds = divmod(total_centiseconds, 100) minutes, seconds_only = divmod(total_seconds, 60) hours, minutes = divmod(minutes, 60) seconds_text = f"{seconds_only:02d}" if centiseconds <= 0 else f"{seconds_only:02d}.{centiseconds:02d}" if hours > 0: return f"{hours:02d}h{minutes:02d}m{seconds_text}s" return f"{minutes:02d}m{seconds_text}s" def get_process_filename_token(process_name: str) -> str: words = str(process_name or "").strip().split() if len(words) == 0: return "process" token = "".join(char for char in words[0].lower() if char.isalnum() or char in {"-", "_"}) return token or "process" def _supported_suffix(preferred_suffix: str, default_container: str) -> str: preferred_container = str(preferred_suffix or "").strip().lower().lstrip(".") if preferred_container in constants.SUPPORTED_OUTPUT_CONTAINERS: return f".{preferred_container}" fallback_container = str(default_container or "mp4").strip().lower() or "mp4" return f".{fallback_container}" if fallback_container in constants.SUPPORTED_OUTPUT_CONTAINERS else ".mp4" def build_auto_output_path(source_path: str, process_name: str, ratio_text: str, output_resolution: str, start_seconds: float | None, end_seconds: float | None, output_dir: str | None = None, *, has_outpaint: bool = False, default_container: str = "mp4") -> str: source = Path(source_path) process_token = get_process_filename_token(process_name) resolution_suffix = str(output_resolution or "").strip() or "res" start_suffix = format_time_token(start_seconds) end_suffix = format_time_token(end_seconds) target_dir = source.parent if not output_dir else Path(output_dir) output_suffix = _supported_suffix(source.suffix, default_container) name_parts = [source.stem, process_token] if has_outpaint: name_parts.append(str(ratio_text or "").replace(":", "x") or "ratio") name_parts.extend([resolution_suffix, start_suffix, end_suffix]) return str(target_dir / f"{'_'.join(name_parts)}{output_suffix}") def make_output_variant(output: Path, *, notify: Callable[[str], None] | None = None) -> str: for index in range(2, 10000): candidate = output.with_name(f"{output.stem}_{index}{output.suffix}") if not candidate.exists(): if notify is not None: notify(f"Output file already exists. Using {candidate}") return str(candidate) raise gr.Error(f"Unable to find a free output filename for {output}") def list_continuation_output_paths(output_path: str) -> list[str]: output = Path(output_path) base_stem = f"{output.stem}_continue" candidates: list[tuple[int, str]] = [] for child in output.parent.glob(f"{base_stem}*{output.suffix}"): if not child.is_file(): continue if child.stem == base_stem: candidates.append((1, str(child))) continue prefix = base_stem + "_" if not child.stem.startswith(prefix): continue suffix = child.stem[len(prefix):] if suffix.isdigit(): candidates.append((int(suffix), str(child))) return [path for _, path in sorted(candidates)] def make_continuation_output_path(output_path: str) -> str: output = Path(output_path) existing_paths = list_continuation_output_paths(str(output)) if len(existing_paths) == 0: return str(output.with_name(f"{output.stem}_continue{output.suffix}")) max_index = 1 base_stem = f"{output.stem}_continue" for existing_path in existing_paths: existing_stem = Path(existing_path).stem if existing_stem == base_stem: max_index = max(max_index, 1) continue suffix = existing_stem[len(base_stem) + 1:] if suffix.isdigit(): max_index = max(max_index, int(suffix)) for index in range(max_index + 1, 10000): variant = output.with_name(f"{output.stem}_continue_{index}{output.suffix}") if not variant.exists(): return str(variant) raise gr.Error(f"Unable to find a free continuation filename for {output}") def build_requested_output_path(source_path: str, output_path: str, process_name: str, ratio_text: str, output_resolution: str, start_seconds: float | None, end_seconds: float | None, *, has_outpaint: bool = False, default_container: str = "mp4") -> Path: output_text = str(output_path or "").strip() if len(output_text) == 0: output = Path(build_auto_output_path(source_path, process_name, ratio_text, output_resolution, start_seconds, end_seconds, has_outpaint=has_outpaint, default_container=default_container)) elif output_text.endswith(("\\", "/")) or Path(output_text).is_dir(): output = Path(build_auto_output_path(source_path, process_name, ratio_text, output_resolution, start_seconds, end_seconds, output_dir=output_text, has_outpaint=has_outpaint, default_container=default_container)) else: output = Path(output_text) if not output.suffix: output = output.with_suffix(_supported_suffix("", default_container)) elif output.suffix.lstrip(".").lower() not in constants.SUPPORTED_OUTPUT_CONTAINERS: supported_text = ", ".join(f".{container}" for container in sorted(constants.SUPPORTED_OUTPUT_CONTAINERS)) raise gr.Error(f"Output File must use one of these container extensions: {supported_text}.") return output def resolve_output_path(source_path: str, output_path: str, process_name: str, ratio_text: str, output_resolution: str, start_seconds: float | None, end_seconds: float | None, continue_enabled: bool, *, has_outpaint: bool = False, default_container: str = "mp4", notify: Callable[[str], None] | None = None) -> tuple[str, bool]: output = build_requested_output_path(source_path, output_path, process_name, ratio_text, output_resolution, start_seconds, end_seconds, has_outpaint=has_outpaint, default_container=default_container) if continue_enabled: return str(output), output.exists() if output.exists(): return make_output_variant(output, notify=notify), False return str(output), False