import os import imageio import imageio_ffmpeg as ffmpeg import matplotlib.pyplot as plt import numpy as np from matplotlib import cm from PIL import Image from tqdm import tqdm class LowMemoryVideo: def __init__(self, file_name): self.reader = imageio.get_reader(file_name) def __len__(self): return self.reader.count_frames() def __getitem__(self, item): return Image.fromarray(np.array(self.reader.get_data(item))).convert("RGB") def __del__(self): self.reader.close() def split_file_name(file_name): result = [] number = -1 for i in file_name: if ord(i) >= ord("0") and ord(i) <= ord("9"): if number == -1: number = 0 number = number * 10 + ord(i) - ord("0") else: if number != -1: result.append(number) number = -1 result.append(i) if number != -1: result.append(number) result = tuple(result) return result def search_for_images(folder): file_list = [ i for i in os.listdir(folder) if i.endswith(".jpg") or i.endswith(".png") ] file_list = [(split_file_name(file_name), file_name) for file_name in file_list] file_list = [i[1] for i in sorted(file_list)] file_list = [os.path.join(folder, i) for i in file_list] return file_list class LowMemoryImageFolder: def __init__(self, folder, file_list=None): if file_list is None: self.file_list = search_for_images(folder) else: self.file_list = [ os.path.join(folder, file_name) for file_name in file_list ] def __len__(self): return len(self.file_list) def __getitem__(self, item): return Image.open(self.file_list[item]).convert("RGB") def __del__(self): pass def crop_and_resize(image, height, width): image = np.array(image) image_height, image_width, _ = image.shape if image_height / image_width < height / width: croped_width = int(image_height / height * width) left = (image_width - croped_width) // 2 image = image[:, left: left + croped_width] image = Image.fromarray(image).resize((width, height)) else: croped_height = int(image_width / width * height) left = (image_height - croped_height) // 2 image = image[left: left + croped_height, :] image = Image.fromarray(image).resize((width, height)) return image class VideoData: def __init__( self, video_file=None, image_folder=None, height=None, width=None, **kwargs ): if video_file is not None: self.data_type = "video" self.data = LowMemoryVideo(video_file, **kwargs) elif image_folder is not None: self.data_type = "images" self.data = LowMemoryImageFolder(image_folder, **kwargs) else: raise ValueError("Cannot open video or image folder") self.length = None self.set_shape(height, width) def raw_data(self): frames = [] for i in range(self.__len__()): frames.append(self.__getitem__(i)) return frames def set_length(self, length): self.length = length def set_shape(self, height, width): self.height = height self.width = width def __len__(self): if self.length is None: return len(self.data) else: return self.length def shape(self): if self.height is not None and self.width is not None: return self.height, self.width else: height, width, _ = self.__getitem__(0).shape return height, width def __getitem__(self, item): frame = self.data.__getitem__(item) width, height = frame.size if self.height is not None and self.width is not None: if self.height != height or self.width != width: frame = crop_and_resize(frame, self.height, self.width) return frame def __del__(self): pass def save_images(self, folder): os.makedirs(folder, exist_ok=True) for i in tqdm(range(self.__len__()), desc="Saving images"): frame = self.__getitem__(i) frame.save(os.path.join(folder, f"{i}.png")) def save_video_ffmpeg(frames, save_path, fps): # frames: numpy array T H W C, dtype=uint8 frames = np.array(frames) if frames.dtype != np.uint8: frames = (frames * 255).clip(0, 255).astype(np.uint8) T, H, W, C = frames.shape assert C in [1, 3, 4] writer = ffmpeg.write_frames( save_path, (W, H), fps=fps, quality=9, macro_block_size=None, # 避免补边 ) for frame in frames: writer.send(frame) writer.close() def save_video(frames, save_path, fps, quality=9, ffmpeg_params=None, grayscale=True): writer = imageio.get_writer( save_path, fps=fps, quality=quality, macro_block_size=1, ffmpeg_params=ffmpeg_params ) if not grayscale: cmap = plt.get_cmap('Spectral_r') lut = (cmap(np.linspace(0, 1, 256))[:, :3] * 255).astype(np.uint8) for frame in frames: frame = np.array(frame) if not grayscale: if frame.ndim == 3: if frame.shape[-1] >= 3: frame = frame[..., 0] elif frame.shape[-1] == 1: frame = frame[..., 0] if frame.dtype in [np.float32, np.float64]: indices = (frame * 255).clip(0, 255).astype(np.uint8) else: indices = frame.clip(0, 255).astype(np.uint8) frame_out = lut[indices] else: if frame.dtype in [np.float32, np.float64]: frame_out = (frame * 255).clip(0, 255).astype(np.uint8) else: frame_out = frame.astype(np.uint8) if frame_out.ndim == 3 and frame_out.shape[-1] == 1: frame_out = frame_out[..., 0] writer.append_data(frame_out) writer.close() # writer = imageio.get_writer( # save_path, fps=fps, quality=quality, ffmpeg_params=ffmpeg_params # ) # import matplotlib.pyplot as plt # from matplotlib import cm # cmap = plt.get_cmap('Spectral_r') if not grayscale else None # for frame in frames: # frame = np.array(frame) # if not grayscale: # if frame.ndim == 3 and frame.shape[-1] >= 3: # frame = frame[..., 0] # if frame.dtype == np.uint8: # frame = frame.astype(np.float32) / 255.0 # rgba_frame = cmap(frame) # frame = (rgba_frame[..., :3] * 255).clip(0, 255).astype(np.uint8) # else: # if frame.dtype == np.float32 or frame.dtype == np.float64: # frame = (frame * 255).clip(0, 255).astype(np.uint8) # writer.append_data(frame) # writer.close() def save_frames(frames, save_path): os.makedirs(save_path, exist_ok=True) for i, frame in enumerate(tqdm(frames, desc="Saving images")): frame.save(os.path.join(save_path, f"{i}.png")) frame.save(os.path.join(save_path, f"{i}.png")) frame.save(os.path.join(save_path, f"{i}.png"))