File size: 6,476 Bytes
6021dd1 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 |
import matplotlib.pyplot as plt
import matplotlib.gridspec as gridspec
from matplotlib.colors import hsv_to_rgb
import cv2
import moviepy.editor as mpy
import numpy as np
from nowcasting.helpers.gifmaker import save_gif
def flow_to_img(flow_dat, max_displacement=None):
"""Convert optical flow data to HSV images
Parameters
----------
flow_dat : np.ndarray
Shape: (seq_len, 2, H, W)
max_displacement : float or None
Returns
-------
rgb_dat : np.ndarray
Shape: (seq_len, 3, H, W)
"""
assert flow_dat.ndim == 4
flow_scale = np.square(flow_dat).sum(axis=1, keepdims=True)
flow_x = flow_dat[:, :1, :, :]
flow_y = flow_dat[:, 1:, :, :]
flow_angle = np.arctan2(flow_y, flow_x)
flow_angle[flow_angle < 0] += np.pi * 2
v = np.ones((flow_dat.shape[0], 1, flow_dat.shape[2], flow_dat.shape[3]),
dtype=np.float32)
if max_displacement is None:
flow_scale_max = np.sqrt(flow_scale.max())
else:
flow_scale_max = max_displacement
h = flow_angle / (2 * np.pi)
s = np.sqrt(flow_scale) / flow_scale_max
hsv_dat = np.concatenate((h, s, v), axis=1)
rgb_dat = hsv_to_rgb(hsv_dat.transpose((0, 2, 3, 1))).transpose((0, 3, 1, 2))
return rgb_dat
def _ax_imshow(ax, im, **kwargs):
assert im.ndim == 3 or im.ndim == 2
if im.ndim == 2:
ax.imshow(im, **kwargs)
ax.set_axis_off()
else:
if im.shape[0] == 1:
ax.imshow(im[0, :, :], **kwargs)
ax.set_axis_off()
elif im.shape[0] == 3:
ax.imshow(im.transpose((1, 2, 0)), **kwargs)
ax.set_axis_off()
else:
raise NotImplementedError
ax.set_adjustable('box-forced')
ax.autoscale(False)
def get_color_flow_legend_image(size=50):
U, V = np.meshgrid(np.arange(-size, size + 1, dtype=np.float32),
np.arange(-size, size + 1, dtype=np.float32))
flow_scale = np.sqrt(U**2 + V**2)
flow_angle = np.arctan2(V, U)
flow_angle[flow_angle < 0] += np.pi * 2
max_flow_scale = float(size) * np.sqrt(2)
h = flow_angle / (2 * np.pi)
s = flow_scale / max_flow_scale
v = np.ones((size * 2 + 1, size * 2 + 1),
dtype=np.float32)
hsv_dat = np.concatenate((h.reshape((1, size * 2 + 1, size * 2 + 1)),
s.reshape((1, size * 2 + 1, size * 2 + 1)),
v.reshape((1, size * 2 + 1, size * 2 + 1))), axis=0)
rgb_dat = hsv_to_rgb(hsv_dat.transpose((1, 2, 0))).transpose((2, 0, 1))
a = np.ones((1, size * 2 + 1, size * 2 + 1), dtype=np.float32)
rgb_dat[:, flow_scale > max_flow_scale] = 0
a[:, flow_scale > max_flow_scale] = 0
rgba_dat = np.concatenate((rgb_dat, a), axis=0)
return rgba_dat
def save_hko_gif(im_dat, save_path):
"""Save the HKO images to gif
Parameters
----------
im_dat : np.ndarray
Shape: (seqlen, H, W)
save_path : str
Returns
-------
"""
assert im_dat.ndim == 3
save_gif(im_dat, fname=save_path)
return
def merge_rgba_cv2(front_img, back_img):
"""Merge the front image with the background image using the `Painter's algorithm`
Parameters
----------
front_img : np.ndarray
back_img : np.ndarray
Returns
-------
result_img : np.ndarray
"""
assert front_img.shape == back_img.shape
if front_img.dtype == np.uint8:
front_img = front_img.astype(np.float32) / 255.0
if back_img.dtype == np.uint8:
back_img = back_img.astype(np.float32) / 255.0
result_img = np.zeros(front_img.shape, dtype=np.float32)
result_img[:, :, 3] = front_img[:, :, 3] + back_img[:, :, 3] * (1 - front_img[:, :, 3])
result_img[:, :, :3] = (front_img[:, :, :3] * front_img[:, :, 3:] +
back_img[:, :, :3] * back_img[:, :, 3:] * (1 - front_img[:, :, 3:])) /\
result_img[:, :, 3:]
result_img = (result_img * 255.0).astype(np.uint8)
return result_img
def save_hko_movie(im_dat, datetime_list, mask_dat=None, save_path="hko.mp4", masked=False,
fps=5, prediction_start=None):
"""Save the HKO images to a video file
Parameters
----------
im_dat : np.ndarray
Shape : (seq_len, H, W)
datetime_list : list
list of datetimes
mask_dat : np.ndarray or None
Shape : (seq_len, H, W)
save_path : str
masked : bool
whether the mask the inputs when saving the image
fps : float
the fps of the saved movie
prediction_start : int or None
The starting point of the prediction
"""
from nowcasting.config import cfg
central_region = cfg.HKO.EVALUATION.CENTRAL_REGION
seq_len, height, width = im_dat.shape
display_im_dat = []
mask_color = np.array((0, 170, 160, 150), dtype=np.float32) / 255.0
if im_dat.dtype == np.float32:
im_dat = (im_dat * 255).astype(np.uint8)
for i in range(im_dat.shape[0]):
if not masked:
color_im_dat = cv2.cvtColor(im_dat[i], cv2.COLOR_GRAY2RGBA)
im = color_im_dat
else:
im = im_dat[i] * mask_dat[i]
im = cv2.cvtColor(im, cv2.COLOR_GRAY2RGBA)
# Uncomment the following code to add transparency to the masks
# color_im_dat = cv2.cvtColor(im_dat[i], cv2.COLOR_GRAY2RGBA)
# mask_im_dat = mask_color.reshape((1, 1, 4)) * np.expand_dims(1 - mask_dat[i], axis=2)
# im = merge_rgba_cv2(front_img=mask_im_dat, back_img=color_im_dat)
if prediction_start is not None and i >= prediction_start:
cv2.putText(im, text=datetime_list[i].strftime('%Y/%m/%d %H:%M'),
org=(0, 20), fontFace=cv2.FONT_HERSHEY_SIMPLEX, fontScale=0.4,
color=(255, 0, 0, 0))
else:
cv2.putText(im, text=datetime_list[i].strftime('%Y/%m/%d %H:%M'),
org=(0, 20), fontFace=cv2.FONT_HERSHEY_SIMPLEX, fontScale=0.4,
color=(255, 255, 255, 0))
cv2.rectangle(im,
pt1=(central_region[0], central_region[1]),
pt2=(central_region[2], central_region[3]),
color=(0, 255, 0, 0))
display_im_dat.append(im)
clip = mpy.ImageSequenceClip(display_im_dat, with_mask=False, fps=fps)
clip.write_videofile(save_path, audio=False, verbose=False, threads=4)
|