# This folder is modified from the https://github.com/Mikubill/sd-webui-controlnet import os import cv2 import folder_paths import numpy as np import torch from einops import rearrange from .dwpose_utils import DWposeDetector from .zoe.zoedepth.models.zoedepth.zoedepth_v1 import ZoeDepth from .zoe.zoedepth.utils.config import get_config remote_onnx_det = "https://huggingface.co/yzd-v/DWPose/resolve/main/yolox_l.onnx" remote_onnx_pose = "https://huggingface.co/yzd-v/DWPose/resolve/main/dw-ll_ucoco_384.onnx" remote_zoe= "https://huggingface.co/lllyasviel/Annotators/resolve/main/ZoeD_M12_N.pt" def read_video(video_path): cap = cv2.VideoCapture(video_path) frames = [] while cap.isOpened(): ret, frame = cap.read() if not ret: break frame = cv2.cvtColor(frame, cv2.COLOR_RGB2BGR) frames.append(frame) cap.release() return frames def HWC3(x): assert x.dtype == np.uint8 if x.ndim == 2: x = x[:, :, None] assert x.ndim == 3 H, W, C = x.shape assert C == 1 or C == 3 or C == 4 if C == 3: return x if C == 1: return np.concatenate([x, x, x], axis=2) if C == 4: color = x[:, :, 0:3].astype(np.float32) alpha = x[:, :, 3:4].astype(np.float32) / 255.0 y = color * alpha + 255.0 * (1.0 - alpha) y = y.clip(0, 255).astype(np.uint8) return y def pad64(x): return int(np.ceil(float(x) / 64.0) * 64 - x) def safer_memory(x): # Fix many MAC/AMD problems return np.ascontiguousarray(x.copy()).copy() def resize_image_with_pad(input_image, resolution, skip_hwc3=False): if skip_hwc3: img = input_image else: img = HWC3(input_image) H_raw, W_raw, _ = img.shape k = float(resolution) / float(min(H_raw, W_raw)) interpolation = cv2.INTER_CUBIC if k > 1 else cv2.INTER_AREA H_target = int(np.round(float(H_raw) * k)) W_target = int(np.round(float(W_raw) * k)) img = cv2.resize(img, (W_target, H_target), interpolation=interpolation) H_pad, W_pad = pad64(H_target), pad64(W_target) img_padded = np.pad(img, [[0, H_pad], [0, W_pad], [0, 0]], mode='edge') def remove_pad(x): return safer_memory(x[:H_target, :W_target]) return safer_memory(img_padded), remove_pad def load_file_from_url( url: str, model_dir: str, progress: bool = True, file_name: str | None = None, hash_prefix: str | None = None, ) -> str: """Download a file from `url` into `model_dir`, using the file present if possible. Returns the path to the downloaded file. """ from urllib.parse import urlparse os.makedirs(model_dir, exist_ok=True) if not file_name: parts = urlparse(url) file_name = os.path.basename(parts.path) cached_file = os.path.abspath(os.path.join(model_dir, file_name)) if not os.path.exists(cached_file): print(f'Downloading: "{url}" to {cached_file}\n') from torch.hub import download_url_to_file download_url_to_file(url, cached_file, progress=progress, hash_prefix=hash_prefix) return cached_file class VideoToCanny: @classmethod def INPUT_TYPES(s): return { "required": { "input_video": ("IMAGE",), "low_threshold": ("INT", {"default": 100, "min": 0, "max": 255, "step": 1}), "high_threshold": ("INT", {"default": 200, "min": 0, "max": 255, "step": 1}), "video_length": ( "INT", {"default": 81, "min": 1, "max": 81, "step": 4} ), } } RETURN_TYPES = ("IMAGE",) RETURN_NAMES =("images",) FUNCTION = "process" CATEGORY = "CogVideoXFUNWrapper" def process(self, input_video, low_threshold, high_threshold, video_length): def extract_canny_frames(frames): canny_frames = [] for frame in frames: gray = cv2.cvtColor(frame, cv2.COLOR_RGB2GRAY) edges = cv2.Canny(gray, low_threshold, high_threshold) edges_colored = cv2.cvtColor(edges, cv2.COLOR_GRAY2RGB) canny_frames.append(edges_colored) return canny_frames if type(input_video) is str: video_frames = read_video(input_video) else: video_frames = np.array(input_video * 255, np.uint8)[:video_length] output_video = extract_canny_frames(video_frames) output_video = torch.from_numpy(np.array(output_video)) / 255 return (output_video,) class VideoToDepth: @classmethod def INPUT_TYPES(s): return { "required": { "input_video": ("IMAGE",), "video_length": ( "INT", {"default": 81, "min": 1, "max": 81, "step": 4} ), } } RETURN_TYPES = ("IMAGE",) RETURN_NAMES = ("images",) FUNCTION = "process" CATEGORY = "CogVideoXFUNWrapper" def process_frame(self, model, image, device, weight_dtype): with torch.no_grad(): image, remove_pad = resize_image_with_pad(image, 512) image_depth = image with torch.no_grad(): image_depth = torch.from_numpy(image_depth).to(device, weight_dtype) image_depth = image_depth / 255.0 image_depth = rearrange(image_depth, 'h w c -> 1 c h w') depth = model.infer(image_depth) depth = depth[0, 0].cpu().numpy() vmin = np.percentile(depth, 2) vmax = np.percentile(depth, 85) depth -= vmin depth /= vmax - vmin depth = 1.0 - depth depth_image = (depth * 255.0).clip(0, 255).astype(np.uint8) image = remove_pad(depth_image) image = HWC3(image) return image def process(self, input_video, video_length): model = ZoeDepth.build_from_config(get_config("zoedepth", "infer")) # Detect model is existing or not possible_folders = ["CogVideoX_Fun/Third_Party", "Fun_Models/Third_Party", "VideoX_Fun/Third_Party"] # Possible folder names to check # Check if the model exists in any of the possible folders within folder_paths.models_dir zoe_model_path = "ZoeD_M12_N.pt" for folder in possible_folders: candidate_path = os.path.join(folder_paths.models_dir, folder, zoe_model_path) if os.path.exists(candidate_path): zoe_model_path = candidate_path break if not os.path.exists(zoe_model_path): load_file_from_url(remote_zoe, model_dir=os.path.join(folder_paths.models_dir, "Fun_Models/Third_Party")) zoe_model_path = os.path.join(folder_paths.models_dir, "Fun_Models/Third_Party", zoe_model_path) model.load_state_dict( torch.load(zoe_model_path, map_location="cpu")['model'], strict=False ) if torch.cuda.is_available(): device = "cuda" weight_dtype = torch.float32 else: device = "cpu" weight_dtype = torch.float32 model = model.to(device=device, dtype=weight_dtype).eval().requires_grad_(False) if isinstance(input_video, str): video_frames = read_video(input_video) else: video_frames = np.array(input_video * 255, np.uint8)[:video_length] output_video = [self.process_frame(model, frame, device, weight_dtype) for frame in video_frames] output_video = torch.from_numpy(np.array(output_video)) / 255 return (output_video,) class VideoToPose: @classmethod def INPUT_TYPES(s): return { "required": { "input_video": ("IMAGE",), "video_length": ( "INT", {"default": 81, "min": 1, "max": 81, "step": 4} ), } } RETURN_TYPES = ("IMAGE",) RETURN_NAMES = ("images",) FUNCTION = "process" CATEGORY = "CogVideoXFUNWrapper" def process_frame(self, model, image): with torch.no_grad(): image, remove_pad = resize_image_with_pad(image, 512) pose_image = model(image) image = remove_pad(pose_image) image = HWC3(image) return image def process(self, input_video, video_length): # Detect model is existing or not possible_folders = ["CogVideoX_Fun/Third_Party", "Fun_Models/Third_Party", "VideoX_Fun/Third_Party"] # Possible folder names to check # Check if the model exists in any of the possible folders within folder_paths.models_dir onnx_det = "yolox_l.onnx" for folder in possible_folders: candidate_path = os.path.join(folder_paths.models_dir, folder, onnx_det) if os.path.exists(candidate_path): onnx_det = candidate_path break if not os.path.exists(onnx_det): load_file_from_url(remote_onnx_det, os.path.join(folder_paths.models_dir, "Fun_Models/Third_Party")) onnx_det = os.path.join(folder_paths.models_dir, "Fun_Models/Third_Party", onnx_det) onnx_pose = "dw-ll_ucoco_384.onnx" for folder in possible_folders: candidate_path = os.path.join(folder_paths.models_dir, folder, onnx_pose) if os.path.exists(candidate_path): onnx_pose = candidate_path break if not os.path.exists(onnx_pose): load_file_from_url(remote_onnx_pose, os.path.join(folder_paths.models_dir, "Fun_Models/Third_Party")) onnx_pose = os.path.join(folder_paths.models_dir, "Fun_Models/Third_Party", onnx_pose) model = DWposeDetector(onnx_det, onnx_pose) if isinstance(input_video, str): video_frames = read_video(input_video) else: video_frames = np.array(input_video * 255, np.uint8)[:video_length] output_video = [self.process_frame(model, frame) for frame in video_frames] output_video = torch.from_numpy(np.array(output_video)) / 255 return (output_video,)