| import os |
| import hashlib |
| import json |
| import subprocess |
| import shutil |
| import re |
| import time,math |
| import numpy as np |
| from typing import List |
| import torch |
| from PIL import Image, ImageOps |
| from PIL.PngImagePlugin import PngInfo |
| import cv2,random,string |
| from pathlib import Path |
|
|
| import folder_paths |
| from comfy.k_diffusion.utils import FolderOfImages |
| from comfy.utils import common_upscale |
|
|
| import torchaudio |
| import base64 |
|
|
| import mimetypes |
|
|
|
|
|
|
| def get_frames(frame_count, frames, revert=False): |
| if not revert: |
| if frame_count <= len(frames): |
| return frames[:frame_count] |
| else: |
| return [frames[i % len(frames)] for i in range(frame_count)] |
| else: |
| extended_frames = frames + frames[-2:0:-1] |
| if frame_count <= len(extended_frames): |
| return extended_frames[:frame_count] |
| else: |
| return [extended_frames[i % len(extended_frames)] for i in range(frame_count)] |
|
|
| |
| |
| |
|
|
| |
| |
|
|
| |
| |
|
|
|
|
|
|
|
|
| def get_mime_type(file_path): |
| |
| mime_type, _ = mimetypes.guess_type(file_path) |
| |
| |
| if mime_type is None: |
| return 'application/octet-stream' |
| |
| return mime_type |
| |
| |
|
|
|
|
| def save_audio_base64s_to_file(base64_audios, output_folder, file_name): |
| |
| if not os.path.exists(output_folder): |
| os.makedirs(output_folder) |
|
|
| decoded_audios=[] |
| for a in base64_audios: |
| |
| |
| if ',' in a: |
| a = a.split(',')[1] |
| |
| |
| a=base64.b64decode(a) |
| decoded_audios.append(a) |
|
|
| |
| combined_audio = b''.join(decoded_audios) |
| |
| |
| file_path = os.path.join(output_folder, file_name) |
| |
| |
| with open(file_path, 'wb') as audio_file: |
| audio_file.write(combined_audio) |
| |
| return file_path |
|
|
| |
| |
| |
| |
|
|
| |
| |
|
|
| |
| def get_new_counter(full_output_folder, filename_prefix): |
| |
| files = os.listdir(full_output_folder) |
| |
| |
| filtered_files = [] |
| for f in files: |
| if f.startswith(filename_prefix): |
| |
| base_name = f[len(filename_prefix)+1:] |
| number_part = base_name.split('.')[0] |
| if number_part.isdigit(): |
| filtered_files.append(int(number_part)) |
|
|
| if not filtered_files: |
| return 1 |
|
|
| |
| max_number = max(filtered_files) |
| |
| |
| return max_number + 1 |
|
|
| def crop_audio(input_file, start_time, duration): |
| |
| audio_tensor, sample_rate = torchaudio.load(input_file) |
| |
| |
| start_sample = int(start_time * sample_rate) |
| end_sample = start_sample + int(duration * sample_rate) |
| |
| |
| cropped_audio_tensor = audio_tensor[:, start_sample:end_sample] |
| |
| |
| torchaudio.save(input_file, cropped_audio_tensor, sample_rate) |
| |
| return input_file |
|
|
| def generate_folder_name(directory,video_path): |
| |
| _, filename = os.path.split(video_path) |
| |
| random_string = ''.join(random.choices(string.ascii_lowercase + string.digits, k=8)) |
| |
| folder_name = random_string + '_' + filename |
| |
| folder_path = os.path.join(directory, folder_name) |
| return folder_path |
|
|
| def create_folder(directory,video_path): |
| folder_path = generate_folder_name(directory,video_path) |
| os.makedirs(folder_path) |
| return folder_path |
|
|
|
|
| def split_video(video_path, video_segment_frames, transition_frames, output_dir): |
| |
| video_capture = cv2.VideoCapture(video_path) |
| |
| |
| total_frames = int(video_capture.get(cv2.CAP_PROP_FRAME_COUNT)) |
| fps = video_capture.get(cv2.CAP_PROP_FPS) |
| |
| |
| segment_total_frames = video_segment_frames + transition_frames |
| |
| |
| num_segments = (total_frames + transition_frames - 1) // segment_total_frames |
| |
| vs=[] |
| |
| start_frame = 0 |
| for i in range(num_segments): |
| |
| end_frame = min(start_frame + segment_total_frames, total_frames) |
| |
| |
| print(f"Segment {i+1}: Start Frame {start_frame}, End Frame {end_frame}") |
| |
| |
| segment_video_path = f"{output_dir}/segment_{i+1}.avi" |
|
|
| fourcc = cv2.VideoWriter_fourcc(*'XVID') |
| segment_video = cv2.VideoWriter(segment_video_path, fourcc, fps, (int(video_capture.get(cv2.CAP_PROP_FRAME_WIDTH)), |
| int(video_capture.get(cv2.CAP_PROP_FRAME_HEIGHT)))) |
| |
| for frame_num in range(start_frame, end_frame): |
| ret, frame = video_capture.read() |
| if ret: |
| segment_video.write(frame) |
| else: |
| break |
| |
| |
| start_frame = end_frame + transition_frames |
| vs.append(segment_video_path) |
| |
| |
| video_capture.release() |
| |
| return (vs,total_frames,fps) |
|
|
|
|
| folder_paths.folder_names_and_paths["video_formats"] = ( |
| [ |
| os.path.join(os.path.dirname(os.path.abspath(__file__)), ".", "video_formats"), |
| ], |
| [".json"] |
| ) |
|
|
| ffmpeg_path = shutil.which("ffmpeg") |
| if ffmpeg_path is None: |
| print("ffmpeg could not be found. Using ffmpeg from imageio-ffmpeg.") |
| from imageio_ffmpeg import get_ffmpeg_exe |
| try: |
| ffmpeg_path = get_ffmpeg_exe() |
| except: |
| print("ffmpeg could not be found. Outputs that require it have been disabled") |
|
|
|
|
| def combine_audio_video(audio_path, video_path, output_path): |
| |
| command = [ |
| ffmpeg_path, |
| '-i', video_path, |
| '-i', audio_path, |
| '-c:v', 'copy', |
| '-c:a', 'aac', |
| '-shortest', |
| output_path |
| ] |
| |
| subprocess.run(command, check=True) |
| return output_path |
|
|
|
|
|
|
|
|
| |
| def tensor2pil(image): |
| return Image.fromarray(np.clip(255. * image.cpu().numpy().squeeze(), 0, 255).astype(np.uint8)) |
|
|
| |
| def pil2tensor(image): |
| return torch.from_numpy(np.array(image).astype(np.float32) / 255.0).unsqueeze(0) |
|
|
| def count_files(directory): |
| count = 0 |
| for root, dirs, files in os.walk(directory): |
| count += len(files) |
| return count |
|
|
| def create_temp_file(image): |
| output_dir = folder_paths.get_temp_directory() |
|
|
| c=count_files(output_dir) |
|
|
| ( |
| full_output_folder, |
| filename, |
| counter, |
| subfolder, |
| _, |
| ) = folder_paths.get_save_image_path('temp_', output_dir) |
|
|
| |
| image=tensor2pil(image) |
| |
| image_file = f"{filename}_{c}_{counter:05}.png" |
| |
| image_path=os.path.join(full_output_folder, image_file) |
|
|
| image.save(image_path,compress_level=4) |
|
|
| return [{ |
| "filename": image_file, |
| "subfolder": subfolder, |
| "type": "temp" |
| }] |
|
|
|
|
| def split_list(lst, chunk_size, transition_size): |
| result = [] |
| for i in range(0, len(lst), chunk_size): |
| start = i - transition_size |
| end = i + chunk_size + transition_size |
| result.append(lst[max(start, 0):end]) |
| return result |
|
|
| |
| |
| |
|
|
| |
| |
|
|
| class ImageListReplace: |
| @classmethod |
| def INPUT_TYPES(s): |
| return {"required": { |
| "images": ("IMAGE",), |
| "start_index":("INT", {"default": 0, "min": 0, "step": 1}), |
| "end_index":("INT", {"default": 0, "min": 0, "step": 1}), |
| "invert": ("BOOLEAN", {"default": False}), |
| }, |
| "optional":{ |
| "image_replace": ("IMAGE",), |
| "images_replace": ("IMAGE",), |
| |
| } |
| } |
|
|
| RETURN_TYPES = ("IMAGE","IMAGE",) |
| RETURN_NAMES = ("images","select_images",) |
| FUNCTION = "run" |
| CATEGORY = "♾️Mixlab/Video" |
|
|
| OUTPUT_NODE = True |
| INPUT_IS_LIST = True |
| OUTPUT_IS_LIST = (True,True,) |
|
|
| def run(self, images,start_index=[0],end_index=[0],invert=[False],image_replace=None,images_replace=None): |
| start_index=start_index[0] |
| end_index=end_index[0] |
| invert=invert[0] |
|
|
| image_rs=[] |
|
|
| if image_replace!=None: |
| for i in range(end_index-start_index+1): |
| image_rs.append(image_replace[0]) |
|
|
| if images_replace!=None: |
| image_rs=images_replace |
|
|
| |
| if image_replace==None and images_replace==None: |
| |
| |
| |
| first_image=tensor2pil(images[0][0]) |
| width, height = first_image.size |
| image_replace=Image.new("RGB", (width, height), (0, 0, 0)) |
| image_replace=pil2tensor(image_replace) |
| for i in range(end_index-start_index+1): |
| image_rs.append(image_replace) |
|
|
| |
| new_images=[] |
| select_images=[] |
| k=0 |
| for i in range(len(images)): |
| if i>=start_index and i<=end_index: |
| if invert: |
| new_images.append(images[i]) |
| else: |
| new_images.append(image_rs[k]) |
| select_images.append(images[i]) |
| k+=1 |
| else: |
| if invert: |
| new_images.append(image_rs[k]) |
| select_images.append(images[i]) |
| k+=1 |
| else: |
| new_images.append(images[i]) |
|
|
| imss=[] |
| |
| for i in range(len(images)): |
| t=images[i][0] |
| t=tensor2pil(t) |
| t = t.convert("RGB") |
| original_width, original_height = t.size |
| scale = 300 / original_width |
| new_height = int(original_height * scale) |
| t = t.resize((300, new_height)) |
|
|
| ims=create_temp_file(pil2tensor(t)) |
| imss.append(ims[0]) |
|
|
| |
|
|
| return {"ui":{"_images": imss},"result": (new_images,select_images,)} |
|
|
| |
| class LoadVideoAndSegment: |
| @classmethod |
| def INPUT_TYPES(s): |
| video_extensions = ['webm', 'mp4', 'mkv', 'gif'] |
| input_dir = folder_paths.get_input_directory() |
| files = [] |
| for f in os.listdir(input_dir): |
| if os.path.isfile(os.path.join(input_dir, f)): |
| file_parts = f.split('.') |
| if len(file_parts) > 1 and (file_parts[-1] in video_extensions): |
| files.append(f) |
| return {"required": { |
| "video": (sorted(files), {"video_upload": True}), |
| "video_segment_frames": ("INT", {"default": 10, "min": -1, "step": 1}), |
| "transition_frames": ("INT", {"default": 0, "min": 0, "step": 1}), |
| },} |
|
|
| CATEGORY = "♾️Mixlab/Video" |
|
|
| RETURN_TYPES = ("SCENE_VIDEO","INT", "INT","INT",) |
| RETURN_NAMES = ("scenes_video","scenes_count","frame_count","fps",) |
| FUNCTION = "load_video" |
| OUTPUT_NODE = True |
| OUTPUT_IS_LIST = (True,False,False,False,) |
|
|
|
|
| def is_gif(self, filename): |
| file_parts = filename.split('.') |
| return len(file_parts) > 1 and file_parts[-1] == "gif" |
|
|
| def load_video_cv_fallback(self, video, frame_load_cap, skip_first_frames): |
| try: |
| video_cap = cv2.VideoCapture(folder_paths.get_annotated_filepath(video)) |
| if not video_cap.isOpened(): |
| raise ValueError(f"{video} could not be loaded with cv fallback.") |
| |
| images = [] |
| total_frame_count = 0 |
| frames_added = 0 |
| base_frame_time = 1/video_cap.get(cv2.CAP_PROP_FPS) |
| |
| target_frame_time = base_frame_time |
|
|
| time_offset=0.0 |
| while video_cap.isOpened(): |
| if time_offset < target_frame_time: |
| is_returned, frame = video_cap.read() |
| |
| if not is_returned: |
| break |
| time_offset += base_frame_time |
| if time_offset < target_frame_time: |
| continue |
| time_offset -= target_frame_time |
| |
| total_frame_count += 1 |
| if total_frame_count <= skip_first_frames: |
| continue |
| |
|
|
| |
| |
| frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) |
| |
| image = Image.fromarray(frame) |
| image = ImageOps.exif_transpose(image) |
| image = np.array(image, dtype=np.float32) / 255.0 |
| image = torch.from_numpy(image)[None,] |
| images.append(image) |
| frames_added += 1 |
| |
| if frame_load_cap > 0 and frames_added >= frame_load_cap: |
| break |
| finally: |
| video_cap.release() |
| images = torch.cat(images, dim=0) |
| |
| return (images, frames_added) |
|
|
| def load_video(self, video,video_segment_frames,transition_frames ): |
| |
| video_path = folder_paths.get_annotated_filepath(video) |
| |
| |
| tp=folder_paths.get_temp_directory() |
| basename = os.path.basename(video_path) |
| name_without_extension = os.path.splitext(basename)[0] |
|
|
| folder_path = create_folder(tp,name_without_extension) |
| |
| if video_segment_frames==-1: |
| |
| scenes_video=[video_path] |
| |
| video_capture = cv2.VideoCapture(video_path) |
| |
| |
| total_frames = int(video_capture.get(cv2.CAP_PROP_FRAME_COUNT)) |
| fps = video_capture.get(cv2.CAP_PROP_FPS) |
|
|
| else: |
| |
| scenes_video,total_frames,fps=split_video(video_path,video_segment_frames, |
| transition_frames,folder_path) |
| |
|
|
|
|
| return (scenes_video,len(scenes_video), total_frames,fps,) |
|
|
| @classmethod |
| def IS_CHANGED(s, video, **kwargs): |
| image_path = folder_paths.get_annotated_filepath(video) |
| m = hashlib.sha256() |
| with open(image_path, 'rb') as f: |
| m.update(f.read()) |
| return m.digest().hex() |
|
|
| @classmethod |
| def VALIDATE_INPUTS(s, video, **kwargs): |
| if not folder_paths.exists_annotated_filepath(video): |
| return "Invalid image file: {}".format(video) |
|
|
| return True |
|
|
|
|
|
|
| class LoadAndCombinedAudio_: |
| @classmethod |
| def INPUT_TYPES(s): |
| |
| return {"required": { |
| "audios": ("AUDIOBASE64",), |
| "start_time": ("FLOAT" , {"default": 0, "min": 0, "max": 10000000, "step": 0.01}), |
| "duration": ("FLOAT" , {"default": 10, "min": -1, "max": 10000000, "step": 0.01}), |
| }, |
| } |
|
|
| CATEGORY = "♾️Mixlab/Audio" |
|
|
| RETURN_TYPES = ("STRING","AUDIO",) |
| RETURN_NAMES = ("audio_file_path","audio",) |
| FUNCTION = "run" |
|
|
| def run(self,audios, start_time, duration): |
| output_dir = folder_paths.get_output_directory() |
| counter=get_new_counter(output_dir,'audio_') |
|
|
| audio_file_name = f"audio_{counter:05}.wav" |
|
|
| audio_file=save_audio_base64s_to_file(audios['base64'],output_dir,audio_file_name) |
| |
| if duration > -1: |
| crop_audio(audio_file, start_time, duration) |
|
|
| return (audio_file, { |
| "filename": audio_file_name, |
| "subfolder": "", |
| "type": "output", |
| "audio_path":audio_file |
| } ,) |
|
|
| class CombineAudioVideo: |
| @classmethod |
| def INPUT_TYPES(s): |
| |
| return {"required": { |
| "video_file_path": ("STRING", {"forceInput": True}), |
| "audio_file_path": ("STRING", {"forceInput": True}), |
| }, |
| } |
|
|
| CATEGORY = "♾️Mixlab/Video" |
|
|
| OUTPUT_NODE = True |
| FUNCTION = "run" |
| RETURN_TYPES = () |
| RETURN_NAMES = () |
|
|
| def run(self,video_file_path, audio_file_path): |
|
|
| output_dir = folder_paths.get_output_directory() |
|
|
| counter=get_new_counter(output_dir,'video_final_') |
| |
| |
| base, ext = os.path.splitext(video_file_path) |
|
|
| v_file = f"video_final_{counter:05}{ext}" |
| |
| v_file_path=os.path.join(output_dir, v_file) |
|
|
| combine_audio_video(audio_file_path,video_file_path,v_file_path) |
|
|
| previews = [ |
| { |
| "filename": v_file, |
| "subfolder": "", |
| "type": "output", |
| "format": get_mime_type(v_file), |
| } |
| ] |
| return {"ui": {"gifs": previews}} |
|
|
| |
| class VideoCombine_Adv: |
| @classmethod |
| def INPUT_TYPES(s): |
| |
| if ffmpeg_path is not None: |
| ffmpeg_formats = ["video/"+x[:-5] for x in folder_paths.get_filename_list("video_formats")] |
| else: |
| ffmpeg_formats = [] |
| |
| return { |
| "required": { |
| "image_batch": ("IMAGE",), |
| "frame_rate": ( |
| "INT", |
| {"default": 8, "min": 1, "step": 1}, |
| ), |
| "loop_count": ("INT", {"default": 0, "min": 0, "max": 100, "step": 1}), |
| "filename_prefix": ("STRING", {"default": "Comfyui"}), |
| "format": (["image/gif", "image/webp"] + ffmpeg_formats,), |
| "pingpong": ("BOOLEAN", {"default": False}), |
| "save_image": ("BOOLEAN", {"default": True}), |
| "metadata": ("BOOLEAN", {"default": False}), |
| }, |
| "hidden": { |
| "prompt": "PROMPT", |
| "extra_pnginfo": "EXTRA_PNGINFO", |
| }, |
| } |
|
|
| RETURN_TYPES = ("STRING",) |
| RETURN_NAMES = ("video_file_path",) |
| OUTPUT_NODE = True |
| CATEGORY = "♾️Mixlab/Video" |
| FUNCTION = "run" |
|
|
| def save_with_tempfile(self, args, metadata, file_path, frames, env): |
| |
| os.makedirs(folder_paths.get_temp_directory(), exist_ok=True) |
|
|
| metadata_path = os.path.join(folder_paths.get_temp_directory(), "metadata.txt") |
| |
| |
| |
| |
| metadata = metadata.replace("\\","\\\\") |
| metadata = metadata.replace(";","\\;") |
| metadata = metadata.replace("#","\\#") |
| |
| metadata = metadata.replace("\n","\\\n") |
| with open(metadata_path, "w") as f: |
| f.write(";FFMETADATA1\n") |
| f.write(metadata) |
| args = args[:1] + ["-i", metadata_path] + args[1:] + [file_path] |
| with subprocess.Popen(args, stdin=subprocess.PIPE, env=env) as proc: |
| for frame in frames: |
| proc.stdin.write(frame.tobytes()) |
|
|
| def run( |
| self, |
| image_batch, |
| frame_rate: int, |
| loop_count: int, |
| filename_prefix="AnimateDiff", |
| format="image/gif", |
| pingpong=False, |
| save_image=True, |
| metadata=False, |
| prompt=None, |
| extra_pnginfo=None, |
| ): |
| images=image_batch |
|
|
| frames: List[Image.Image] = [] |
| for image in images: |
| img = 255.0 * image.cpu().numpy() |
| img = Image.fromarray(np.clip(img, 0, 255).astype(np.uint8)) |
| |
| |
| if img.height % 2 != 0: |
| img = img.resize((img.width, img.height + 1)) |
|
|
| |
| if img.width % 2 != 0: |
| img = img.resize((img.width + 1, img.height)) |
| |
| frames.append(img) |
|
|
| |
| output_dir = ( |
| folder_paths.get_output_directory() |
| if save_image |
| else folder_paths.get_temp_directory() |
| ) |
| ( |
| full_output_folder, |
| filename, |
| counter, |
| subfolder, |
| _, |
| ) = folder_paths.get_save_image_path(filename_prefix, output_dir) |
|
|
| metadata = PngInfo() |
| video_metadata = {} |
| if prompt is not None: |
| metadata.add_text("prompt", json.dumps(prompt)) |
| video_metadata["prompt"] = prompt |
| if extra_pnginfo is not None: |
| for x in extra_pnginfo: |
| metadata.add_text(x, json.dumps(extra_pnginfo[x])) |
| video_metadata[x] = extra_pnginfo[x] |
|
|
| |
| if metadata==False: |
| metadata = PngInfo() |
|
|
| |
| file = f"{filename}_{counter:05}_.png" |
| file_path = os.path.join(full_output_folder, file) |
| frames[0].save( |
| file_path, |
| pnginfo=metadata, |
| compress_level=4, |
| ) |
| if pingpong: |
| frames = frames + frames[-2:0:-1] |
|
|
| format_type, format_ext = format.split("/") |
| file = f"{filename}_{counter:05}_.{format_ext}" |
| file_path = os.path.join(full_output_folder, file) |
| if format_type == "image": |
| |
| frames[0].save( |
| file_path, |
| format=format_ext.upper(), |
| save_all=True, |
| append_images=frames[1:], |
| duration=round(1000 / frame_rate), |
| loop=loop_count, |
| compress_level=4, |
| ) |
| else: |
| |
| if ffmpeg_path is None: |
| |
| raise ProcessLookupError("Could not find ffmpeg") |
|
|
| video_format_path = folder_paths.get_full_path("video_formats", format_ext + ".json") |
| with open(video_format_path, 'r') as stream: |
| video_format = json.load(stream) |
| file = f"{filename}_{counter:05}_.{video_format['extension']}" |
| file_path = os.path.join(full_output_folder, file) |
| dimensions = f"{frames[0].width}x{frames[0].height}" |
| metadata_args = ["-metadata", "comment=" + json.dumps(video_metadata)] |
| args = [ffmpeg_path, "-v", "error", "-f", "rawvideo", "-pix_fmt", "rgb24", |
| "-s", dimensions, "-r", str(frame_rate), "-i", "-"] \ |
| + video_format['main_pass'] |
| |
| |
| |
| if os.name == 'posix': |
| max_arg_length = 4096*32 |
| else: |
| max_arg_length = 32767 - len(" ".join(args + [metadata_args[0]] + [file_path])) - 1 |
| |
| |
|
|
| env=os.environ.copy() |
| if "environment" in video_format: |
| env.update(video_format["environment"]) |
| if len(metadata_args[1]) >= max_arg_length: |
| print(f"Using fallback file for extremely long metadata: {len(metadata_args[1])}/{max_arg_length}") |
| self.save_with_tempfile(args, metadata_args[1], file_path, frames, env) |
| else: |
| try: |
| with subprocess.Popen(args + metadata_args + [file_path], |
| stdin=subprocess.PIPE, env=env) as proc: |
| for frame in frames: |
| proc.stdin.write(frame.tobytes()) |
| except FileNotFoundError as e: |
| if "winerror" in dir(e) and e.winerror == 206: |
| print("Metadata was too long. Retrying with fallback file") |
| self.save_with_tempfile(args, metadata_args[1], file_path, frames, env) |
| else: |
| raise |
| except OSError as e: |
| if "errno" in dir(e) and e.errno == 7: |
| print("Metadata was too long. Retrying with fallback file") |
| self.save_with_tempfile(args, metadata_args[1], file_path, frames, env) |
| else: |
| raise |
|
|
| previews = [ |
| { |
| "filename": file, |
| "subfolder": subfolder, |
| "type": "output" if save_image else "temp", |
| "format": format, |
| } |
| ] |
| return {"ui": {"gifs": previews},"result":(file_path,)} |
|
|
|
|
| class VAEEncodeForInpaint_Frames: |
| @classmethod |
| def INPUT_TYPES(s): |
| return {"required": { |
| "vae": ("VAE", ), |
| "images": ("IMAGE", ), |
| "masks": ("MASK", ), |
| "grow_mask_by": ("INT", {"default": 6, "min": 0, "max": 64, "step": 1}), |
| }} |
| |
| FUNCTION = "encode" |
|
|
|
|
| RETURN_TYPES = ("LATENT",) |
| RETURN_NAMES = ("LATENT",) |
| |
| CATEGORY = "♾️Mixlab/Video" |
|
|
| OUTPUT_NODE = True |
| INPUT_IS_LIST = True |
| OUTPUT_IS_LIST = (True,) |
|
|
|
|
| def encode(self, vae, images, masks, grow_mask_by=[6]): |
| vae=vae[0] |
| grow_mask_by=grow_mask_by[0] |
|
|
| result=[] |
|
|
| for i in range(len(images)): |
| pixels=images[i] |
| mask=masks[i] |
|
|
|
|
| x = (pixels.shape[1] // 8) * 8 |
| y = (pixels.shape[2] // 8) * 8 |
| mask = torch.nn.functional.interpolate(mask.reshape((-1, 1, mask.shape[-2], mask.shape[-1])), size=(pixels.shape[1], pixels.shape[2]), mode="bilinear") |
|
|
| pixels = pixels.clone() |
| if pixels.shape[1] != x or pixels.shape[2] != y: |
| x_offset = (pixels.shape[1] % 8) // 2 |
| y_offset = (pixels.shape[2] % 8) // 2 |
| pixels = pixels[:,x_offset:x + x_offset, y_offset:y + y_offset,:] |
| mask = mask[:,:,x_offset:x + x_offset, y_offset:y + y_offset] |
|
|
| |
| if grow_mask_by == 0: |
| mask_erosion = mask |
| else: |
| kernel_tensor = torch.ones((1, 1, grow_mask_by, grow_mask_by)) |
| padding = math.ceil((grow_mask_by - 1) / 2) |
|
|
| mask_erosion = torch.clamp(torch.nn.functional.conv2d(mask.round(), kernel_tensor, padding=padding), 0, 1) |
|
|
| m = (1.0 - mask.round()).squeeze(1) |
| for i in range(3): |
| pixels[:,:,:,i] -= 0.5 |
| pixels[:,:,:,i] *= m |
| pixels[:,:,:,i] += 0.5 |
| t = vae.encode(pixels) |
|
|
| result.append({"samples":t, "noise_mask": (mask_erosion[:,:,:x,:y].round())}) |
|
|
|
|
| return (result, ) |
| |
|
|
|
|
| class GenerateFramesByCount: |
| @classmethod |
| def INPUT_TYPES(cls): |
| |
| return {"required": { |
| "frames": ('IMAGE',), |
| "frame_count": ("INT", {"default": 72, "min": 1, "step": 1}), |
| "revert" :("BOOLEAN", {"default": True},), |
| },} |
|
|
| RETURN_TYPES = ('IMAGE',) |
| RETURN_NAMES = ("frames",) |
|
|
| FUNCTION = "r" |
| CATEGORY = "♾️Mixlab/Video" |
| |
|
|
| def r(self, frames, frame_count, revert): |
| |
| image_list = [frames[i:i + 1, ...] for i in range(frames.shape[0])] |
|
|
| image_list=get_frames(frame_count,image_list,revert) |
|
|
| images = torch.cat(image_list, dim=0) |
|
|
| return (images,) |
|
|
|
|
| class scenesNode_: |
| @classmethod |
| def INPUT_TYPES(cls): |
| |
| return {"required": { |
| "scenes_video": ('SCENE_VIDEO',), |
| "index": ("INT", {"default": 0, "min": 0, "step": 1}), |
| |
| },} |
|
|
| RETURN_TYPES = ('IMAGE','INT',) |
| RETURN_NAMES = ("frames","count",) |
| |
|
|
| FUNCTION = "run" |
| CATEGORY = "♾️Mixlab/Video" |
| INPUT_IS_LIST = True |
|
|
| def load_video_cv_fallback(self, video, frame_load_cap, skip_first_frames): |
| print('#video',video) |
| try: |
| video_cap = cv2.VideoCapture(video) |
| if not video_cap.isOpened(): |
| raise ValueError(f"{video} could not be loaded with cv fallback.") |
| |
| images = [] |
| total_frame_count = 0 |
| frames_added = 0 |
| base_frame_time = 1/video_cap.get(cv2.CAP_PROP_FPS) |
| |
| target_frame_time = base_frame_time |
|
|
| time_offset=0.0 |
| while video_cap.isOpened(): |
| if time_offset < target_frame_time: |
| is_returned, frame = video_cap.read() |
| |
| if not is_returned: |
| break |
| time_offset += base_frame_time |
| if time_offset < target_frame_time: |
| continue |
| time_offset -= target_frame_time |
| |
| total_frame_count += 1 |
| if total_frame_count <= skip_first_frames: |
| continue |
| |
|
|
| |
| |
| frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) |
| |
| image = Image.fromarray(frame) |
| image = ImageOps.exif_transpose(image) |
| image = np.array(image, dtype=np.float32) / 255.0 |
| image = torch.from_numpy(image)[None,] |
| images.append(image) |
| frames_added += 1 |
| |
| if frame_load_cap > 0 and frames_added >= frame_load_cap: |
| break |
| finally: |
| video_cap.release() |
| |
| images = torch.cat(images, dim=0) |
| |
| return (images, frames_added,) |
|
|
| def run(self, scenes_video,index): |
| print('#scenes_video',index,scenes_video) |
| index=index[0] |
| if len(scenes_video) > index: |
| vp=scenes_video[index] |
|
|
| return self.load_video_cv_fallback(vp,0,0) |
| |
| return ([], 0,) |