|
|
from multiprocessing import Process, Queue, Value
|
|
|
import os
|
|
|
from pathlib import Path
|
|
|
import queue
|
|
|
import time
|
|
|
import cv2
|
|
|
import numpy as np
|
|
|
from PIL import Image
|
|
|
|
|
|
import torch
|
|
|
from dataset.range_transform import inv_im_trans
|
|
|
from collections import defaultdict
|
|
|
|
|
|
|
|
|
def tensor_to_numpy(image):
|
|
|
image_np = (image.numpy() * 255).astype('uint8')
|
|
|
return image_np
|
|
|
|
|
|
def tensor_to_np_float(image):
|
|
|
image_np = image.numpy().astype('float32')
|
|
|
return image_np
|
|
|
|
|
|
def detach_to_cpu(x):
|
|
|
return x.detach().cpu()
|
|
|
|
|
|
def transpose_np(x):
|
|
|
return np.transpose(x, [1,2,0])
|
|
|
|
|
|
def tensor_to_gray_im(x):
|
|
|
x = detach_to_cpu(x)
|
|
|
x = tensor_to_numpy(x)
|
|
|
x = transpose_np(x)
|
|
|
return x
|
|
|
|
|
|
def tensor_to_im(x):
|
|
|
x = detach_to_cpu(x)
|
|
|
x = inv_im_trans(x).clamp(0, 1)
|
|
|
x = tensor_to_numpy(x)
|
|
|
x = transpose_np(x)
|
|
|
return x
|
|
|
|
|
|
|
|
|
key_captions = {
|
|
|
'im': 'Image',
|
|
|
'gt': 'GT',
|
|
|
}
|
|
|
|
|
|
"""
|
|
|
Return an image array with captions
|
|
|
keys in dictionary will be used as caption if not provided
|
|
|
values should contain lists of cv2 images
|
|
|
"""
|
|
|
def get_image_array(images, grid_shape, captions={}):
|
|
|
h, w = grid_shape
|
|
|
cate_counts = len(images)
|
|
|
rows_counts = len(next(iter(images.values())))
|
|
|
|
|
|
font = cv2.FONT_HERSHEY_SIMPLEX
|
|
|
|
|
|
output_image = np.zeros([w*cate_counts, h*(rows_counts+1), 3], dtype=np.uint8)
|
|
|
col_cnt = 0
|
|
|
for k, v in images.items():
|
|
|
|
|
|
|
|
|
caption = captions.get(k, k)
|
|
|
|
|
|
|
|
|
dy = 40
|
|
|
for i, line in enumerate(caption.split('\n')):
|
|
|
cv2.putText(output_image, line, (10, col_cnt*w+100+i*dy),
|
|
|
font, 0.8, (255,255,255), 2, cv2.LINE_AA)
|
|
|
|
|
|
|
|
|
for row_cnt, img in enumerate(v):
|
|
|
im_shape = img.shape
|
|
|
if len(im_shape) == 2:
|
|
|
img = img[..., np.newaxis]
|
|
|
|
|
|
img = (img * 255).astype('uint8')
|
|
|
|
|
|
output_image[(col_cnt+0)*w:(col_cnt+1)*w,
|
|
|
(row_cnt+1)*h:(row_cnt+2)*h, :] = img
|
|
|
|
|
|
col_cnt += 1
|
|
|
|
|
|
return output_image
|
|
|
|
|
|
def base_transform(im, size):
|
|
|
im = tensor_to_np_float(im)
|
|
|
if len(im.shape) == 3:
|
|
|
im = im.transpose((1, 2, 0))
|
|
|
else:
|
|
|
im = im[:, :, None]
|
|
|
|
|
|
|
|
|
if im.shape[1] != size:
|
|
|
im = cv2.resize(im, size, interpolation=cv2.INTER_NEAREST)
|
|
|
|
|
|
return im.clip(0, 1)
|
|
|
|
|
|
def im_transform(im, size):
|
|
|
return base_transform(inv_im_trans(detach_to_cpu(im)), size=size)
|
|
|
|
|
|
def mask_transform(mask, size):
|
|
|
return base_transform(detach_to_cpu(mask), size=size)
|
|
|
|
|
|
def out_transform(mask, size):
|
|
|
return base_transform(detach_to_cpu(torch.sigmoid(mask)), size=size)
|
|
|
|
|
|
def pool_pairs(images, size, num_objects):
|
|
|
req_images = defaultdict(list)
|
|
|
|
|
|
b, t = images['rgb'].shape[:2]
|
|
|
|
|
|
|
|
|
b = min(2, b)
|
|
|
|
|
|
|
|
|
max_num_objects = max(num_objects[:b])
|
|
|
|
|
|
GT_suffix = ''
|
|
|
for bi in range(b):
|
|
|
GT_suffix += ' \n%s' % images['info']['name'][bi][-25:-4]
|
|
|
|
|
|
for bi in range(b):
|
|
|
for ti in range(t):
|
|
|
req_images['RGB'].append(im_transform(images['rgb'][bi,ti], size))
|
|
|
for oi in range(max_num_objects):
|
|
|
if ti == 0 or oi >= num_objects[bi]:
|
|
|
req_images['Mask_%d'%oi].append(mask_transform(images['first_frame_gt'][bi][0,oi], size))
|
|
|
|
|
|
|
|
|
else:
|
|
|
req_images['Mask_%d'%oi].append(mask_transform(images['masks_%d'%ti][bi][oi], size))
|
|
|
|
|
|
|
|
|
|
|
|
req_images['GT_%d_%s'%(oi, GT_suffix)].append(mask_transform(images['cls_gt'][bi,ti,0]==(oi+1), size))
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
return get_image_array(req_images, size, key_captions)
|
|
|
|
|
|
def _check_if_black_and_white(img: Image.Image):
|
|
|
unique_colors = img.getcolors()
|
|
|
if len(unique_colors) > 2:
|
|
|
return False
|
|
|
|
|
|
if len(unique_colors) == 1:
|
|
|
return True
|
|
|
|
|
|
for _, color_rgb in unique_colors:
|
|
|
if color_rgb == (255, 255, 255):
|
|
|
return True
|
|
|
|
|
|
return False
|
|
|
|
|
|
def create_overlay(img: Image.Image, mask: Image.Image, mask_alpha=0.5, color_if_black_and_white=(255, 255, 255)):
|
|
|
mask = mask.convert('RGB')
|
|
|
is_b_and_w = _check_if_black_and_white(mask)
|
|
|
|
|
|
if img.size != mask.size:
|
|
|
mask = mask.resize(img.size, resample=Image.NEAREST)
|
|
|
|
|
|
mask_arr = np.array(mask)
|
|
|
|
|
|
if is_b_and_w:
|
|
|
mask_arr = np.where(mask_arr, np.array(color_if_black_and_white), mask_arr).astype(np.uint8)
|
|
|
mask = Image.fromarray(mask_arr, mode='RGB')
|
|
|
|
|
|
alpha_mask = np.full(mask_arr.shape[0:2], 255)
|
|
|
alpha_mask[cv2.cvtColor(mask_arr, cv2.COLOR_BGR2GRAY) > 0] = int(mask_alpha * 255)
|
|
|
|
|
|
overlay = Image.composite(img, mask, Image.fromarray(alpha_mask.astype(np.uint8), mode='L'))
|
|
|
|
|
|
return overlay
|
|
|
|
|
|
def save_image(img: Image.Image, frame_name, video_name, general_dir_path, sub_dir_name='masks', extension='.png'):
|
|
|
this_out_path = os.path.join(general_dir_path, video_name, sub_dir_name)
|
|
|
os.makedirs(this_out_path, exist_ok=True)
|
|
|
|
|
|
img_save_path = os.path.join(this_out_path, frame_name[:-4] + extension)
|
|
|
img.save(img_save_path)
|
|
|
|
|
|
|
|
|
class ParallelImageSaver:
|
|
|
"""
|
|
|
A class for parallel saving of masks and / or overlay images using multiple processes.
|
|
|
Composing overlays and saving images on the drive is pretty slow, this class does it in the background.
|
|
|
|
|
|
Parameters
|
|
|
----------
|
|
|
general_output_path : str
|
|
|
The general path where images and masks will be saved.
|
|
|
vid_name : str
|
|
|
The name of the video or identifier for the output files.
|
|
|
overlay_color_if_b_and_w : tuple, optional
|
|
|
The RGB color to use for masks when there is only one object. Default is (255, 255, 255) (white).
|
|
|
max_queue_size : int, optional
|
|
|
The maximum size of the mask and overlay queues. Default is 200.
|
|
|
|
|
|
Methods
|
|
|
-------
|
|
|
save_mask(mask, frame_name)
|
|
|
Start saving a mask in the background.
|
|
|
save_overlay(orig_img, mask, frame_name)
|
|
|
Create an overlay given an image and a mask, and start saving it in the background.
|
|
|
qsize() -> Tuple(int, int)
|
|
|
Get the current size of the mask and overlay queues (how many frames are still left to process).
|
|
|
__enter__()
|
|
|
Enter the context manager and return the instance itself.
|
|
|
__exit__(exc_type, exc_value, exc_tb)
|
|
|
Exit the context manager and handle cleanup.
|
|
|
wait_for_jobs_to_finish(verbose=False)
|
|
|
Wait for all saving jobs to finish. Optional, will be called automatically in __exit__. Only recommened to use if you want to print verbose progress.
|
|
|
|
|
|
Examples
|
|
|
--------
|
|
|
# Example usage of ParallelImageSaver class
|
|
|
with ParallelImageSaver("/output/directory", "video_1", overlay_color_if_b_and_w=(100, 100, 100)) as image_saver:
|
|
|
image = Image.open("img.jpg")
|
|
|
mask = Image.open("mask.png")
|
|
|
|
|
|
# These will be saved in parallel in background processes
|
|
|
image_saver.save_mask(mask_image, "frame_000001")
|
|
|
image_saver.save_overlay(image, mask, "frame_000001")
|
|
|
|
|
|
image_saver.wait_for_jobs_to_finish(verbose=True) # Optional
|
|
|
|
|
|
# The images will be saved in separate processes in the background.
|
|
|
"""
|
|
|
|
|
|
def __init__(self, general_output_path: str, vid_name: str, overlay_color_if_b_and_w=(255, 255, 255), max_queue_size=200) -> None:
|
|
|
self._mask_queue = Queue(max_queue_size)
|
|
|
self._overlay_queue = Queue(max_queue_size)
|
|
|
|
|
|
self._mask_saver_worker = None
|
|
|
self._overlay_saver_worker = None
|
|
|
|
|
|
self._p_out = Path(general_output_path)
|
|
|
self._vid_name = vid_name
|
|
|
self._object_color = overlay_color_if_b_and_w
|
|
|
self._finished = Value('b', False)
|
|
|
|
|
|
def save_mask(self, mask: Image.Image, frame_name: str):
|
|
|
self._mask_queue.put((mask, frame_name, 'masks', '.png'))
|
|
|
|
|
|
if self._mask_saver_worker is None:
|
|
|
self._mask_saver_worker = Process(target=self._save_mask_fn)
|
|
|
self._mask_saver_worker.start()
|
|
|
|
|
|
def save_overlay(self, orig_img: Image.Image, mask: Image.Image, frame_name: str):
|
|
|
self._overlay_queue.put((orig_img, mask, frame_name, 'overlay', '.jpg'))
|
|
|
|
|
|
if self._overlay_saver_worker is None:
|
|
|
self._overlay_saver_worker = Process(target=self._save_overlay_fn)
|
|
|
self._overlay_saver_worker.start()
|
|
|
|
|
|
def _save_mask_fn(self):
|
|
|
while True:
|
|
|
try:
|
|
|
mask, frame_name, subdir, extension = self._mask_queue.get_nowait()
|
|
|
except queue.Empty:
|
|
|
if self._finished.value:
|
|
|
return
|
|
|
else:
|
|
|
time.sleep(1)
|
|
|
continue
|
|
|
save_image(mask, frame_name, self._vid_name, self._p_out, subdir, extension)
|
|
|
|
|
|
def _save_overlay_fn(self):
|
|
|
while True:
|
|
|
try:
|
|
|
orig_image, mask, frame_name, subdir, extension = self._overlay_queue.get_nowait()
|
|
|
except queue.Empty:
|
|
|
if self._finished.value:
|
|
|
return
|
|
|
else:
|
|
|
time.sleep(1)
|
|
|
continue
|
|
|
overlaid_img = create_overlay(orig_image, mask, color_if_black_and_white=self._object_color)
|
|
|
save_image(overlaid_img, frame_name, self._vid_name, self._p_out, subdir, extension)
|
|
|
|
|
|
def qsize(self):
|
|
|
return self._mask_queue.qsize(), self._overlay_queue.qsize()
|
|
|
|
|
|
def __enter__(self):
|
|
|
|
|
|
return self
|
|
|
|
|
|
def __exit__(self, exc_type, exc_value, exc_tb):
|
|
|
if exc_type is not None:
|
|
|
|
|
|
|
|
|
if self._mask_saver_worker is not None:
|
|
|
self._mask_saver_worker.kill()
|
|
|
if self._mask_saver_worker is not None:
|
|
|
self._mask_saver_worker.kill()
|
|
|
|
|
|
raise exc_value
|
|
|
else:
|
|
|
self.wait_for_jobs_to_finish(verbose=False)
|
|
|
if self._mask_saver_worker is not None:
|
|
|
self._mask_saver_worker.close()
|
|
|
|
|
|
if self._overlay_saver_worker is not None:
|
|
|
self._overlay_saver_worker.close()
|
|
|
|
|
|
def wait_for_jobs_to_finish(self, verbose=False):
|
|
|
|
|
|
|
|
|
self._finished.value = True
|
|
|
|
|
|
if not verbose:
|
|
|
if self._mask_saver_worker is not None:
|
|
|
self._mask_saver_worker.join()
|
|
|
|
|
|
if self._overlay_saver_worker is not None:
|
|
|
self._overlay_saver_worker.join()
|
|
|
|
|
|
else:
|
|
|
while True:
|
|
|
masks_left, overlays_left = self.qsize()
|
|
|
if max(masks_left, overlays_left) > 0:
|
|
|
print(f"Finishing saving the results, {masks_left:>4d} masks and {overlays_left:>4d} overlays left.")
|
|
|
time.sleep(1)
|
|
|
else:
|
|
|
break
|
|
|
|
|
|
self.wait_for_jobs_to_finish(verbose=False)
|
|
|
print("All saving jobs finished") |