| import os
|
| import torch
|
| import numpy as np
|
| import cv2
|
| import time
|
|
|
|
|
| VIDEO_STATE_FILE = os.path.join(os.path.dirname(os.path.realpath(__file__)), "video_iterator_state.txt")
|
| IMAGE_STATE_FILE = os.path.join(os.path.dirname(os.path.realpath(__file__)), "image_iterator_state.txt")
|
|
|
|
|
|
|
|
|
| class VideoIterator:
|
| class AllVideosProcessed(Exception):
|
| pass
|
|
|
| @classmethod
|
| def INPUT_TYPES(cls):
|
| return {
|
| "required": {
|
| "folder_path": ("STRING", {"default": "C:/videos"}),
|
| "reset_counter": ("BOOLEAN", {"default": False}),
|
| }
|
| }
|
|
|
| RETURN_TYPES = ("IMAGE", "INT", "INT", "STRING")
|
| RETURN_NAMES = ("images", "current_index", "total_videos", "filename")
|
| FUNCTION = "iterate_and_load"
|
| CATEGORY = "VideoBatch"
|
|
|
| def iterate_and_load(self, folder_path, reset_counter):
|
| if not os.path.isdir(folder_path): raise FileNotFoundError(f"La carpeta no existe: {folder_path}")
|
|
|
| video_files = sorted([f for f in os.listdir(folder_path) if f.lower().endswith(('.mp4', '.mov', '.avi', '.mkv'))])
|
| num_videos = len(video_files)
|
|
|
| current_index = 0
|
| if reset_counter:
|
| with open(VIDEO_STATE_FILE, "w") as f: f.write("0")
|
| print("VideoIterator: Contador reiniciado a 0.")
|
| else:
|
| try:
|
| with open(VIDEO_STATE_FILE, "r") as f: current_index = int(f.read())
|
| except (IOError, ValueError): pass
|
|
|
| if num_videos > 0 and current_index >= num_videos:
|
| with open(VIDEO_STATE_FILE, "w") as f: f.write("0")
|
| raise self.AllVideosProcessed(f"Proceso finalizado. Se procesaron {num_videos} vídeos.")
|
|
|
| next_index = current_index + 1
|
| with open(VIDEO_STATE_FILE, "w") as f: f.write(str(next_index))
|
|
|
| if not video_files: raise FileNotFoundError(f"No se encontraron vídeos en la carpeta: {folder_path}")
|
|
|
| filename = video_files[current_index]
|
| video_path = os.path.join(folder_path, filename)
|
| print(f"VideoIterator: Cargando vídeo #{current_index}/{num_videos-1}: {filename}")
|
|
|
| cap = cv2.VideoCapture(video_path)
|
| frames = []
|
| while cap.isOpened():
|
| ret, frame = cap.read()
|
| if not ret: break
|
| frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
|
| frames.append(torch.from_numpy(np.array(frame).astype(np.float32) / 255.0)[None,])
|
| cap.release()
|
|
|
| if not frames: return (torch.zeros((1, 64, 64, 3), dtype=torch.float32), current_index, num_videos, "ERROR_LOADING_VIDEO")
|
|
|
| return (torch.cat(frames, dim=0), current_index, num_videos, filename)
|
|
|
| @classmethod
|
| def IS_CHANGED(cls, folder_path, reset_counter):
|
| return time.time()
|
|
|
|
|
|
|
|
|
| class ImageIterator:
|
| class AllImagesProcessed(Exception):
|
| pass
|
|
|
| @classmethod
|
| def INPUT_TYPES(cls):
|
| return {
|
| "required": {
|
| "folder_path": ("STRING", {"default": "C:/images"}),
|
| "reset_counter": ("BOOLEAN", {"default": False}),
|
| }
|
| }
|
|
|
| RETURN_TYPES = ("IMAGE", "INT", "INT", "STRING")
|
| RETURN_NAMES = ("image", "current_index", "total_images", "filename")
|
| FUNCTION = "iterate_and_load"
|
| CATEGORY = "ImageBatch"
|
|
|
| def iterate_and_load(self, folder_path, reset_counter):
|
| if not os.path.isdir(folder_path): raise FileNotFoundError(f"La carpeta no existe: {folder_path}")
|
|
|
|
|
| image_files = sorted([f for f in os.listdir(folder_path) if f.lower().endswith(('.png', '.jpg', '.jpeg', '.webp', '.bmp'))])
|
| num_images = len(image_files)
|
|
|
| current_index = 0
|
| if reset_counter:
|
| with open(IMAGE_STATE_FILE, "w") as f: f.write("0")
|
| print("ImageIterator: Contador reiniciado a 0.")
|
| else:
|
| try:
|
| with open(IMAGE_STATE_FILE, "r") as f: current_index = int(f.read())
|
| except (IOError, ValueError): pass
|
|
|
| if num_images > 0 and current_index >= num_images:
|
| with open(IMAGE_STATE_FILE, "w") as f: f.write("0")
|
| raise self.AllImagesProcessed(f"Proceso finalizado. Se procesaron {num_images} imágenes.")
|
|
|
| next_index = current_index + 1
|
| with open(IMAGE_STATE_FILE, "w") as f: f.write(str(next_index))
|
|
|
| if not image_files: raise FileNotFoundError(f"No se encontraron imágenes en la carpeta: {folder_path}")
|
|
|
| filename = image_files[current_index]
|
| image_path = os.path.join(folder_path, filename)
|
| print(f"ImageIterator: Cargando imagen #{current_index}/{num_images-1}: {filename}")
|
|
|
|
|
| img = cv2.imread(image_path)
|
| img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
|
|
|
|
|
| image_tensor = torch.from_numpy(np.array(img).astype(np.float32) / 255.0)[None,]
|
|
|
| return (image_tensor, current_index, num_images, filename)
|
|
|
| @classmethod
|
| def IS_CHANGED(cls, folder_path, reset_counter):
|
| return time.time()
|
|
|
|
|
|
|
| NODE_CLASS_MAPPINGS = {
|
| "VideoIterator": VideoIterator,
|
| "ImageIterator": ImageIterator
|
| }
|
|
|
| NODE_DISPLAY_NAME_MAPPINGS = {
|
| "VideoIterator": "Video Iterator (Load & Count)",
|
| "ImageIterator": "Image Iterator (Load & Count)"
|
| } |