1ripon1's picture
Upload folder using huggingface_hub
7344bef verified
Raw
History Blame Contribute Delete
6.91 kB
from __future__ import annotations
from collections.abc import Callable
from pathlib import Path
import gradio as gr
from . import constants
def choose_resolution(budget_label: str) -> str:
resolutions = {"256p": "448x256", "320p": "576x320", "384p": "672x384", "480p": "832x480", "540p": "960x544", "720p": "1280x720", "900p": "1600x896", "1080p": "1920x1088"}
try:
return resolutions[str(budget_label)]
except KeyError as exc:
raise gr.Error(f"Unsupported Output Resolution: {budget_label}") from exc
def format_time_token(seconds: float | None) -> str:
if seconds in (None, ""):
return "end"
total_centiseconds = max(0, int(round(float(seconds) * 100.0)))
total_seconds, centiseconds = divmod(total_centiseconds, 100)
minutes, seconds_only = divmod(total_seconds, 60)
hours, minutes = divmod(minutes, 60)
seconds_text = f"{seconds_only:02d}" if centiseconds <= 0 else f"{seconds_only:02d}.{centiseconds:02d}"
if hours > 0:
return f"{hours:02d}h{minutes:02d}m{seconds_text}s"
return f"{minutes:02d}m{seconds_text}s"
def get_process_filename_token(process_name: str) -> str:
words = str(process_name or "").strip().split()
if len(words) == 0:
return "process"
token = "".join(char for char in words[0].lower() if char.isalnum() or char in {"-", "_"})
return token or "process"
def _supported_suffix(preferred_suffix: str, default_container: str) -> str:
preferred_container = str(preferred_suffix or "").strip().lower().lstrip(".")
if preferred_container in constants.SUPPORTED_OUTPUT_CONTAINERS:
return f".{preferred_container}"
fallback_container = str(default_container or "mp4").strip().lower() or "mp4"
return f".{fallback_container}" if fallback_container in constants.SUPPORTED_OUTPUT_CONTAINERS else ".mp4"
def build_auto_output_path(source_path: str, process_name: str, ratio_text: str, output_resolution: str, start_seconds: float | None, end_seconds: float | None, output_dir: str | None = None, *, has_outpaint: bool = False, default_container: str = "mp4") -> str:
source = Path(source_path)
process_token = get_process_filename_token(process_name)
resolution_suffix = str(output_resolution or "").strip() or "res"
start_suffix = format_time_token(start_seconds)
end_suffix = format_time_token(end_seconds)
target_dir = source.parent if not output_dir else Path(output_dir)
output_suffix = _supported_suffix(source.suffix, default_container)
name_parts = [source.stem, process_token]
if has_outpaint:
name_parts.append(str(ratio_text or "").replace(":", "x") or "ratio")
name_parts.extend([resolution_suffix, start_suffix, end_suffix])
return str(target_dir / f"{'_'.join(name_parts)}{output_suffix}")
def make_output_variant(output: Path, *, notify: Callable[[str], None] | None = None) -> str:
for index in range(2, 10000):
candidate = output.with_name(f"{output.stem}_{index}{output.suffix}")
if not candidate.exists():
if notify is not None:
notify(f"Output file already exists. Using {candidate}")
return str(candidate)
raise gr.Error(f"Unable to find a free output filename for {output}")
def list_continuation_output_paths(output_path: str) -> list[str]:
output = Path(output_path)
base_stem = f"{output.stem}_continue"
candidates: list[tuple[int, str]] = []
for child in output.parent.glob(f"{base_stem}*{output.suffix}"):
if not child.is_file():
continue
if child.stem == base_stem:
candidates.append((1, str(child)))
continue
prefix = base_stem + "_"
if not child.stem.startswith(prefix):
continue
suffix = child.stem[len(prefix):]
if suffix.isdigit():
candidates.append((int(suffix), str(child)))
return [path for _, path in sorted(candidates)]
def make_continuation_output_path(output_path: str) -> str:
output = Path(output_path)
existing_paths = list_continuation_output_paths(str(output))
if len(existing_paths) == 0:
return str(output.with_name(f"{output.stem}_continue{output.suffix}"))
max_index = 1
base_stem = f"{output.stem}_continue"
for existing_path in existing_paths:
existing_stem = Path(existing_path).stem
if existing_stem == base_stem:
max_index = max(max_index, 1)
continue
suffix = existing_stem[len(base_stem) + 1:]
if suffix.isdigit():
max_index = max(max_index, int(suffix))
for index in range(max_index + 1, 10000):
variant = output.with_name(f"{output.stem}_continue_{index}{output.suffix}")
if not variant.exists():
return str(variant)
raise gr.Error(f"Unable to find a free continuation filename for {output}")
def build_requested_output_path(source_path: str, output_path: str, process_name: str, ratio_text: str, output_resolution: str, start_seconds: float | None, end_seconds: float | None, *, has_outpaint: bool = False, default_container: str = "mp4") -> Path:
output_text = str(output_path or "").strip()
if len(output_text) == 0:
output = Path(build_auto_output_path(source_path, process_name, ratio_text, output_resolution, start_seconds, end_seconds, has_outpaint=has_outpaint, default_container=default_container))
elif output_text.endswith(("\\", "/")) or Path(output_text).is_dir():
output = Path(build_auto_output_path(source_path, process_name, ratio_text, output_resolution, start_seconds, end_seconds, output_dir=output_text, has_outpaint=has_outpaint, default_container=default_container))
else:
output = Path(output_text)
if not output.suffix:
output = output.with_suffix(_supported_suffix("", default_container))
elif output.suffix.lstrip(".").lower() not in constants.SUPPORTED_OUTPUT_CONTAINERS:
supported_text = ", ".join(f".{container}" for container in sorted(constants.SUPPORTED_OUTPUT_CONTAINERS))
raise gr.Error(f"Output File must use one of these container extensions: {supported_text}.")
return output
def resolve_output_path(source_path: str, output_path: str, process_name: str, ratio_text: str, output_resolution: str, start_seconds: float | None, end_seconds: float | None, continue_enabled: bool, *, has_outpaint: bool = False, default_container: str = "mp4", notify: Callable[[str], None] | None = None) -> tuple[str, bool]:
output = build_requested_output_path(source_path, output_path, process_name, ratio_text, output_resolution, start_seconds, end_seconds, has_outpaint=has_outpaint, default_container=default_container)
if continue_enabled:
return str(output), output.exists()
if output.exists():
return make_output_variant(output, notify=notify), False
return str(output), False