import os import re import shutil import subprocess import time from collections.abc import Mapping import torch import numpy as np # OpenCV for video decoding try: import cv2 _has_cv2 = True except Exception: _has_cv2 = False # ========================= # AUDIO (встроено из utils) # ========================= ENCODE_ARGS = ("utf-8", "backslashreplace") def _pick_ffmpeg_path(): # 1) env override (как в VHS) if "VHS_FORCE_FFMPEG_PATH" in os.environ: p = os.environ.get("VHS_FORCE_FFMPEG_PATH") if p: return p # 2) system ffmpeg system_ffmpeg = shutil.which("ffmpeg") if system_ffmpeg is not None: return system_ffmpeg # 3) local рядом if os.path.isfile("ffmpeg"): return os.path.abspath("ffmpeg") if os.path.isfile("ffmpeg.exe"): return os.path.abspath("ffmpeg.exe") return None ffmpeg_path = _pick_ffmpeg_path() def get_audio(file, start_time=0, duration=0): if ffmpeg_path is None: raise Exception("ffmpeg not found. Put ffmpeg in PATH, or set VHS_FORCE_FFMPEG_PATH env var.") args = [ffmpeg_path, "-i", file] if start_time > 0: args += ["-ss", str(start_time)] if duration > 0: args += ["-t", str(duration)] try: # как в utils: вытаскиваем raw f32le в stdout res = subprocess.run(args + ["-f", "f32le", "-"], capture_output=True, check=True) audio = torch.frombuffer(bytearray(res.stdout), dtype=torch.float32) match = re.search(r", (\d+) Hz, (\w+), ", res.stderr.decode(*ENCODE_ARGS)) except subprocess.CalledProcessError as e: raise Exception(f"Failed to extract audio from {file}:\n" + e.stderr.decode(*ENCODE_ARGS)) if match: ar = int(match.group(1)) ac = {"mono": 1, "stereo": 2}.get(match.group(2), 2) else: ar = 44100 ac = 2 # reshape как в utils: (-1, channels) -> (channels, samples) -> (1, channels, samples) if audio.numel() == 0: # видео без аудио — вернем пустой аудиобуфер, но корректный формат empty = torch.zeros((1, 1, 0), dtype=torch.float32) return {"waveform": empty, "sample_rate": ar} audio = audio.reshape((-1, ac)).transpose(0, 1).unsqueeze(0) return {"waveform": audio, "sample_rate": ar} class LazyAudioMap(Mapping): def __init__(self, file, start_time, duration): self.file = file self.start_time = start_time self.duration = duration self._dict = None def _ensure(self): if self._dict is None: self._dict = get_audio(self.file, self.start_time, self.duration) def __getitem__(self, key): self._ensure() return self._dict[key] def __iter__(self): self._ensure() return iter(self._dict) def __len__(self): self._ensure() return len(self._dict) def lazy_get_audio(file, start_time=0, duration=0, **kwargs): return LazyAudioMap(file, start_time, duration) # ========================= # остальной код ноды # ========================= def extract_first_number(s): match = re.search(r"\d+", s) return int(match.group()) if match else float("inf") sort_methods = [ "None", "Alphabetical (ASC)", "Alphabetical (DESC)", "Numerical (ASC)", "Numerical (DESC)", "Datetime (ASC)", "Datetime (DESC)", ] def sort_by(items, base_path=".", method=None): def fullpath(x): return os.path.join(base_path, x) def get_timestamp(path): try: return os.path.getmtime(path) except FileNotFoundError: return float("-inf") if method == "Alphabetical (ASC)": return sorted(items) elif method == "Alphabetical (DESC)": return sorted(items, reverse=True) elif method == "Numerical (ASC)": return sorted(items, key=lambda x: extract_first_number(os.path.splitext(x)[0])) elif method == "Numerical (DESC)": return sorted(items, key=lambda x: extract_first_number(os.path.splitext(x)[0]), reverse=True) elif method == "Datetime (ASC)": return sorted(items, key=lambda x: get_timestamp(fullpath(x))) elif method == "Datetime (DESC)": return sorted(items, key=lambda x: get_timestamp(fullpath(x)), reverse=True) else: return items def target_size(width, height, custom_width, custom_height, downscale_ratio=8): if downscale_ratio is None: downscale_ratio = 8 if custom_width == 0 and custom_height == 0: new_w, new_h = width, height elif custom_height == 0: new_h = int(height * (custom_width / width)) new_w = int(custom_width) elif custom_width == 0: new_w = int(width * (custom_height / height)) new_h = int(custom_height) else: new_w, new_h = int(custom_width), int(custom_height) new_w = int(new_w / downscale_ratio + 0.5) * downscale_ratio new_h = int(new_h / downscale_ratio + 0.5) * downscale_ratio return new_w, new_h def _read_frames_vhs_like( video_path: str, force_rate: float = 0, custom_width: int = 0, custom_height: int = 0, downscale_ratio: int = 8, frame_load_cap: int = 0, ): if not _has_cv2: raise RuntimeError("OpenCV (cv2) not available. Install opencv-python.") cap = cv2.VideoCapture(video_path) if not cap.isOpened() or not cap.grab(): raise FileNotFoundError(f"Cannot open video: {video_path}") fps = cap.get(cv2.CAP_PROP_FPS) if fps is None or fps <= 0: fps = 30.0 width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) ok0, frame0 = cap.retrieve() if not ok0 or frame0 is None: cap.release() raise RuntimeError(f"Cannot retrieve first frame from: {video_path}") if width <= 0 or height <= 0: height, width = frame0.shape[:2] base_dt = 1.0 / float(fps) target_dt = base_dt if force_rate == 0 else (1.0 / float(force_rate)) loaded_fps = 1.0 / target_dt if target_dt > 0 else float(fps) new_w, new_h = target_size(width, height, custom_width, custom_height, downscale_ratio) do_resize = (new_w != width) or (new_h != height) frames = [] time_offset = target_dt def _process_frame(bgr): rgb = cv2.cvtColor(bgr, cv2.COLOR_BGR2RGB) if do_resize: rgb = cv2.resize(rgb, (new_w, new_h), interpolation=cv2.INTER_LANCZOS4) return rgb frames.append(_process_frame(frame0)) if frame_load_cap > 0 and len(frames) >= frame_load_cap: cap.release() arr = np.stack(frames, axis=0).astype(np.float32) / 255.0 t = torch.from_numpy(arr) return t, float(fps), float(loaded_fps), float(len(t) * target_dt), 0.0 time_offset -= target_dt while cap.isOpened(): if time_offset < target_dt: ok = cap.grab() if not ok: break time_offset += base_dt continue ok, frame_bgr = cap.retrieve() if not ok or frame_bgr is None: break frames.append(_process_frame(frame_bgr)) if frame_load_cap > 0 and len(frames) >= frame_load_cap: break time_offset -= target_dt cap.release() if len(frames) == 0: raise RuntimeError(f"No frames could be read from: {video_path}") arr = np.stack(frames, axis=0).astype(np.float32) / 255.0 t = torch.from_numpy(arr) loaded_duration = float(len(t) * target_dt) return t, float(fps), float(loaded_fps), loaded_duration, 0.0 class LoadVideoBatchListFromDir: @classmethod def INPUT_TYPES(s): return { "required": { "directory": ("STRING", {"default": ""}), "force_rate": ("FLOAT", {"default": 0, "min": 0, "max": 120, "step": 1}), "width": ("INT", {"default": 720, "min": 0, "max": 8192, "step": 1}), "height": ("INT", {"default": 1280, "min": 0, "max": 8192, "step": 1}), }, "optional": { "video_load_cap": ("INT", {"default": 0, "min": 0, "step": 1}), "frame_load_cap": ("INT", {"default": 0, "min": 0, "step": 1}), "start_index": ("INT", {"default": 0, "min": 0, "max": 0xFFFFFFFFFFFFFFFF, "step": 1}), "load_always": ("BOOLEAN", {"default": False, "label_on": "enabled", "label_off": "disabled"}), "sort_method": (sort_methods,), }, } RETURN_TYPES = ("IMAGE", "AUDIO", "INT") RETURN_NAMES = ("IMAGE", "audio", "COUNT") OUTPUT_IS_LIST = (True, True, False) FUNCTION = "load_videos" CATEGORY = "video" @classmethod def IS_CHANGED(cls, **kwargs): if kwargs.get("load_always"): return float("NaN") return hash(frozenset(kwargs.items())) def load_videos( self, directory: str, force_rate: float = 0, width: int = 0, height: int = 0, video_load_cap: int = 0, frame_load_cap: int = 0, start_index: int = 0, load_always: bool = False, sort_method=None, ): if not os.path.isdir(directory): raise FileNotFoundError(f"Directory '{directory}' cannot be found.") files = os.listdir(directory) if len(files) == 0: raise FileNotFoundError(f"No files in directory '{directory}'.") valid_ext = {".mp4", ".mov", ".mkv", ".webm", ".avi", ".m4v"} files = [ f for f in files if os.path.isfile(os.path.join(directory, f)) and os.path.splitext(f)[1].lower() in valid_ext ] if len(files) == 0: raise FileNotFoundError(f"No video files in directory '{directory}' (expected: {sorted(valid_ext)}).") files = sort_by(files, directory, sort_method) files = files[start_index:] if video_load_cap > 0: files = files[:video_load_cap] images_list = [] audios_list = [] for fname in files: path = os.path.join(directory, fname) vid, source_fps, loaded_fps, loaded_duration, start_time = _read_frames_vhs_like( path, force_rate=force_rate, custom_width=width, custom_height=height, downscale_ratio=8, frame_load_cap=frame_load_cap, ) images_list.append(vid) # duration based on loaded frames/time audio = lazy_get_audio(path, start_time, loaded_duration) audios_list.append(audio) return (images_list, audios_list, len(images_list))