|
|
import matplotlib.pyplot as plt |
|
|
import matplotlib.gridspec as gridspec |
|
|
from matplotlib.colors import hsv_to_rgb |
|
|
import cv2 |
|
|
import moviepy.editor as mpy |
|
|
import numpy as np |
|
|
from nowcasting.helpers.gifmaker import save_gif |
|
|
|
|
|
def flow_to_img(flow_dat, max_displacement=None): |
|
|
"""Convert optical flow data to HSV images |
|
|
|
|
|
Parameters |
|
|
---------- |
|
|
flow_dat : np.ndarray |
|
|
Shape: (seq_len, 2, H, W) |
|
|
max_displacement : float or None |
|
|
|
|
|
Returns |
|
|
------- |
|
|
rgb_dat : np.ndarray |
|
|
Shape: (seq_len, 3, H, W) |
|
|
""" |
|
|
assert flow_dat.ndim == 4 |
|
|
flow_scale = np.square(flow_dat).sum(axis=1, keepdims=True) |
|
|
flow_x = flow_dat[:, :1, :, :] |
|
|
flow_y = flow_dat[:, 1:, :, :] |
|
|
flow_angle = np.arctan2(flow_y, flow_x) |
|
|
flow_angle[flow_angle < 0] += np.pi * 2 |
|
|
v = np.ones((flow_dat.shape[0], 1, flow_dat.shape[2], flow_dat.shape[3]), |
|
|
dtype=np.float32) |
|
|
if max_displacement is None: |
|
|
flow_scale_max = np.sqrt(flow_scale.max()) |
|
|
else: |
|
|
flow_scale_max = max_displacement |
|
|
h = flow_angle / (2 * np.pi) |
|
|
s = np.sqrt(flow_scale) / flow_scale_max |
|
|
|
|
|
hsv_dat = np.concatenate((h, s, v), axis=1) |
|
|
rgb_dat = hsv_to_rgb(hsv_dat.transpose((0, 2, 3, 1))).transpose((0, 3, 1, 2)) |
|
|
return rgb_dat |
|
|
|
|
|
|
|
|
def _ax_imshow(ax, im, **kwargs): |
|
|
assert im.ndim == 3 or im.ndim == 2 |
|
|
if im.ndim == 2: |
|
|
ax.imshow(im, **kwargs) |
|
|
ax.set_axis_off() |
|
|
else: |
|
|
if im.shape[0] == 1: |
|
|
ax.imshow(im[0, :, :], **kwargs) |
|
|
ax.set_axis_off() |
|
|
elif im.shape[0] == 3: |
|
|
ax.imshow(im.transpose((1, 2, 0)), **kwargs) |
|
|
ax.set_axis_off() |
|
|
else: |
|
|
raise NotImplementedError |
|
|
ax.set_adjustable('box-forced') |
|
|
ax.autoscale(False) |
|
|
|
|
|
|
|
|
def get_color_flow_legend_image(size=50): |
|
|
U, V = np.meshgrid(np.arange(-size, size + 1, dtype=np.float32), |
|
|
np.arange(-size, size + 1, dtype=np.float32)) |
|
|
flow_scale = np.sqrt(U**2 + V**2) |
|
|
flow_angle = np.arctan2(V, U) |
|
|
flow_angle[flow_angle < 0] += np.pi * 2 |
|
|
max_flow_scale = float(size) * np.sqrt(2) |
|
|
h = flow_angle / (2 * np.pi) |
|
|
s = flow_scale / max_flow_scale |
|
|
v = np.ones((size * 2 + 1, size * 2 + 1), |
|
|
dtype=np.float32) |
|
|
hsv_dat = np.concatenate((h.reshape((1, size * 2 + 1, size * 2 + 1)), |
|
|
s.reshape((1, size * 2 + 1, size * 2 + 1)), |
|
|
v.reshape((1, size * 2 + 1, size * 2 + 1))), axis=0) |
|
|
rgb_dat = hsv_to_rgb(hsv_dat.transpose((1, 2, 0))).transpose((2, 0, 1)) |
|
|
a = np.ones((1, size * 2 + 1, size * 2 + 1), dtype=np.float32) |
|
|
rgb_dat[:, flow_scale > max_flow_scale] = 0 |
|
|
a[:, flow_scale > max_flow_scale] = 0 |
|
|
rgba_dat = np.concatenate((rgb_dat, a), axis=0) |
|
|
return rgba_dat |
|
|
|
|
|
|
|
|
def save_hko_gif(im_dat, save_path): |
|
|
"""Save the HKO images to gif |
|
|
|
|
|
Parameters |
|
|
---------- |
|
|
im_dat : np.ndarray |
|
|
Shape: (seqlen, H, W) |
|
|
save_path : str |
|
|
Returns |
|
|
------- |
|
|
""" |
|
|
assert im_dat.ndim == 3 |
|
|
save_gif(im_dat, fname=save_path) |
|
|
return |
|
|
|
|
|
|
|
|
def merge_rgba_cv2(front_img, back_img): |
|
|
"""Merge the front image with the background image using the `Painter's algorithm` |
|
|
|
|
|
Parameters |
|
|
---------- |
|
|
front_img : np.ndarray |
|
|
back_img : np.ndarray |
|
|
|
|
|
Returns |
|
|
------- |
|
|
result_img : np.ndarray |
|
|
""" |
|
|
assert front_img.shape == back_img.shape |
|
|
if front_img.dtype == np.uint8: |
|
|
front_img = front_img.astype(np.float32) / 255.0 |
|
|
if back_img.dtype == np.uint8: |
|
|
back_img = back_img.astype(np.float32) / 255.0 |
|
|
result_img = np.zeros(front_img.shape, dtype=np.float32) |
|
|
result_img[:, :, 3] = front_img[:, :, 3] + back_img[:, :, 3] * (1 - front_img[:, :, 3]) |
|
|
result_img[:, :, :3] = (front_img[:, :, :3] * front_img[:, :, 3:] + |
|
|
back_img[:, :, :3] * back_img[:, :, 3:] * (1 - front_img[:, :, 3:])) /\ |
|
|
result_img[:, :, 3:] |
|
|
result_img = (result_img * 255.0).astype(np.uint8) |
|
|
return result_img |
|
|
|
|
|
|
|
|
def save_hko_movie(im_dat, datetime_list, mask_dat=None, save_path="hko.mp4", masked=False, |
|
|
fps=5, prediction_start=None): |
|
|
"""Save the HKO images to a video file |
|
|
|
|
|
Parameters |
|
|
---------- |
|
|
im_dat : np.ndarray |
|
|
Shape : (seq_len, H, W) |
|
|
datetime_list : list |
|
|
list of datetimes |
|
|
mask_dat : np.ndarray or None |
|
|
Shape : (seq_len, H, W) |
|
|
save_path : str |
|
|
masked : bool |
|
|
whether the mask the inputs when saving the image |
|
|
fps : float |
|
|
the fps of the saved movie |
|
|
prediction_start : int or None |
|
|
The starting point of the prediction |
|
|
""" |
|
|
from nowcasting.config import cfg |
|
|
central_region = cfg.HKO.EVALUATION.CENTRAL_REGION |
|
|
seq_len, height, width = im_dat.shape |
|
|
display_im_dat = [] |
|
|
mask_color = np.array((0, 170, 160, 150), dtype=np.float32) / 255.0 |
|
|
if im_dat.dtype == np.float32: |
|
|
im_dat = (im_dat * 255).astype(np.uint8) |
|
|
for i in range(im_dat.shape[0]): |
|
|
if not masked: |
|
|
color_im_dat = cv2.cvtColor(im_dat[i], cv2.COLOR_GRAY2RGBA) |
|
|
im = color_im_dat |
|
|
else: |
|
|
im = im_dat[i] * mask_dat[i] |
|
|
im = cv2.cvtColor(im, cv2.COLOR_GRAY2RGBA) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if prediction_start is not None and i >= prediction_start: |
|
|
cv2.putText(im, text=datetime_list[i].strftime('%Y/%m/%d %H:%M'), |
|
|
org=(0, 20), fontFace=cv2.FONT_HERSHEY_SIMPLEX, fontScale=0.4, |
|
|
color=(255, 0, 0, 0)) |
|
|
else: |
|
|
cv2.putText(im, text=datetime_list[i].strftime('%Y/%m/%d %H:%M'), |
|
|
org=(0, 20), fontFace=cv2.FONT_HERSHEY_SIMPLEX, fontScale=0.4, |
|
|
color=(255, 255, 255, 0)) |
|
|
cv2.rectangle(im, |
|
|
pt1=(central_region[0], central_region[1]), |
|
|
pt2=(central_region[2], central_region[3]), |
|
|
color=(0, 255, 0, 0)) |
|
|
display_im_dat.append(im) |
|
|
clip = mpy.ImageSequenceClip(display_im_dat, with_mask=False, fps=fps) |
|
|
clip.write_videofile(save_path, audio=False, verbose=False, threads=4) |
|
|
|