dtlzz / iterator_pro_deluxe.py
Stkzzzz222's picture
Upload iterator_pro_deluxe.py
6f918bc verified
import os
import torch
import numpy as np
import cv2
import time
# --- Archivos de Estado (uno para cada nodo para no mezclarlos) ---
VIDEO_STATE_FILE = os.path.join(os.path.dirname(os.path.realpath(__file__)), "video_iterator_state.txt")
IMAGE_STATE_FILE = os.path.join(os.path.dirname(os.path.realpath(__file__)), "image_iterator_state.txt")
# ======================================================================
# == NODO 1: ITERADOR DE VÍDEOS (EL QUE YA TENÍAMOS) ====================
# ======================================================================
class VideoIterator:
class AllVideosProcessed(Exception):
pass
@classmethod
def INPUT_TYPES(cls):
return {
"required": {
"folder_path": ("STRING", {"default": "C:/videos"}),
"reset_counter": ("BOOLEAN", {"default": False}),
}
}
RETURN_TYPES = ("IMAGE", "INT", "INT", "STRING")
RETURN_NAMES = ("images", "current_index", "total_videos", "filename")
FUNCTION = "iterate_and_load"
CATEGORY = "VideoBatch"
def iterate_and_load(self, folder_path, reset_counter):
if not os.path.isdir(folder_path): raise FileNotFoundError(f"La carpeta no existe: {folder_path}")
video_files = sorted([f for f in os.listdir(folder_path) if f.lower().endswith(('.mp4', '.mov', '.avi', '.mkv'))])
num_videos = len(video_files)
current_index = 0
if reset_counter:
with open(VIDEO_STATE_FILE, "w") as f: f.write("0")
print("VideoIterator: Contador reiniciado a 0.")
else:
try:
with open(VIDEO_STATE_FILE, "r") as f: current_index = int(f.read())
except (IOError, ValueError): pass
if num_videos > 0 and current_index >= num_videos:
with open(VIDEO_STATE_FILE, "w") as f: f.write("0")
raise self.AllVideosProcessed(f"Proceso finalizado. Se procesaron {num_videos} vídeos.")
next_index = current_index + 1
with open(VIDEO_STATE_FILE, "w") as f: f.write(str(next_index))
if not video_files: raise FileNotFoundError(f"No se encontraron vídeos en la carpeta: {folder_path}")
filename = video_files[current_index]
video_path = os.path.join(folder_path, filename)
print(f"VideoIterator: Cargando vídeo #{current_index}/{num_videos-1}: {filename}")
cap = cv2.VideoCapture(video_path)
frames = []
while cap.isOpened():
ret, frame = cap.read()
if not ret: break
frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
frames.append(torch.from_numpy(np.array(frame).astype(np.float32) / 255.0)[None,])
cap.release()
if not frames: return (torch.zeros((1, 64, 64, 3), dtype=torch.float32), current_index, num_videos, "ERROR_LOADING_VIDEO")
return (torch.cat(frames, dim=0), current_index, num_videos, filename)
@classmethod
def IS_CHANGED(cls, folder_path, reset_counter):
return time.time()
# ======================================================================
# == NODO 2: NUEVO ITERADOR DE IMÁGENES ================================
# ======================================================================
class ImageIterator:
class AllImagesProcessed(Exception):
pass
@classmethod
def INPUT_TYPES(cls):
return {
"required": {
"folder_path": ("STRING", {"default": "C:/images"}),
"reset_counter": ("BOOLEAN", {"default": False}),
}
}
RETURN_TYPES = ("IMAGE", "INT", "INT", "STRING")
RETURN_NAMES = ("image", "current_index", "total_images", "filename")
FUNCTION = "iterate_and_load"
CATEGORY = "ImageBatch" # Nueva categoría para encontrarlo fácilmente
def iterate_and_load(self, folder_path, reset_counter):
if not os.path.isdir(folder_path): raise FileNotFoundError(f"La carpeta no existe: {folder_path}")
# CAMBIO CLAVE: Buscamos extensiones de imagen
image_files = sorted([f for f in os.listdir(folder_path) if f.lower().endswith(('.png', '.jpg', '.jpeg', '.webp', '.bmp'))])
num_images = len(image_files)
current_index = 0
if reset_counter:
with open(IMAGE_STATE_FILE, "w") as f: f.write("0")
print("ImageIterator: Contador reiniciado a 0.")
else:
try:
with open(IMAGE_STATE_FILE, "r") as f: current_index = int(f.read())
except (IOError, ValueError): pass
if num_images > 0 and current_index >= num_images:
with open(IMAGE_STATE_FILE, "w") as f: f.write("0")
raise self.AllImagesProcessed(f"Proceso finalizado. Se procesaron {num_images} imágenes.")
next_index = current_index + 1
with open(IMAGE_STATE_FILE, "w") as f: f.write(str(next_index))
if not image_files: raise FileNotFoundError(f"No se encontraron imágenes en la carpeta: {folder_path}")
filename = image_files[current_index]
image_path = os.path.join(folder_path, filename)
print(f"ImageIterator: Cargando imagen #{current_index}/{num_images-1}: {filename}")
# CAMBIO CLAVE: Cargamos una sola imagen, no un vídeo
img = cv2.imread(image_path)
img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB) # Convertimos a RGB
# Convertimos la imagen a un tensor de PyTorch con el formato correcto (batch de 1)
image_tensor = torch.from_numpy(np.array(img).astype(np.float32) / 255.0)[None,]
return (image_tensor, current_index, num_images, filename)
@classmethod
def IS_CHANGED(cls, folder_path, reset_counter):
return time.time()
# --- Registro de AMBOS Nodos ---
NODE_CLASS_MAPPINGS = {
"VideoIterator": VideoIterator,
"ImageIterator": ImageIterator
}
NODE_DISPLAY_NAME_MAPPINGS = {
"VideoIterator": "Video Iterator (Load & Count)",
"ImageIterator": "Image Iterator (Load & Count)"
}