STLDM_official / nowcasting /helpers /visualization.py
sqfoo's picture
Upload 99 files
6021dd1 verified
import matplotlib.pyplot as plt
import matplotlib.gridspec as gridspec
from matplotlib.colors import hsv_to_rgb
import cv2
import moviepy.editor as mpy
import numpy as np
from nowcasting.helpers.gifmaker import save_gif
def flow_to_img(flow_dat, max_displacement=None):
"""Convert optical flow data to HSV images
Parameters
----------
flow_dat : np.ndarray
Shape: (seq_len, 2, H, W)
max_displacement : float or None
Returns
-------
rgb_dat : np.ndarray
Shape: (seq_len, 3, H, W)
"""
assert flow_dat.ndim == 4
flow_scale = np.square(flow_dat).sum(axis=1, keepdims=True)
flow_x = flow_dat[:, :1, :, :]
flow_y = flow_dat[:, 1:, :, :]
flow_angle = np.arctan2(flow_y, flow_x)
flow_angle[flow_angle < 0] += np.pi * 2
v = np.ones((flow_dat.shape[0], 1, flow_dat.shape[2], flow_dat.shape[3]),
dtype=np.float32)
if max_displacement is None:
flow_scale_max = np.sqrt(flow_scale.max())
else:
flow_scale_max = max_displacement
h = flow_angle / (2 * np.pi)
s = np.sqrt(flow_scale) / flow_scale_max
hsv_dat = np.concatenate((h, s, v), axis=1)
rgb_dat = hsv_to_rgb(hsv_dat.transpose((0, 2, 3, 1))).transpose((0, 3, 1, 2))
return rgb_dat
def _ax_imshow(ax, im, **kwargs):
assert im.ndim == 3 or im.ndim == 2
if im.ndim == 2:
ax.imshow(im, **kwargs)
ax.set_axis_off()
else:
if im.shape[0] == 1:
ax.imshow(im[0, :, :], **kwargs)
ax.set_axis_off()
elif im.shape[0] == 3:
ax.imshow(im.transpose((1, 2, 0)), **kwargs)
ax.set_axis_off()
else:
raise NotImplementedError
ax.set_adjustable('box-forced')
ax.autoscale(False)
def get_color_flow_legend_image(size=50):
U, V = np.meshgrid(np.arange(-size, size + 1, dtype=np.float32),
np.arange(-size, size + 1, dtype=np.float32))
flow_scale = np.sqrt(U**2 + V**2)
flow_angle = np.arctan2(V, U)
flow_angle[flow_angle < 0] += np.pi * 2
max_flow_scale = float(size) * np.sqrt(2)
h = flow_angle / (2 * np.pi)
s = flow_scale / max_flow_scale
v = np.ones((size * 2 + 1, size * 2 + 1),
dtype=np.float32)
hsv_dat = np.concatenate((h.reshape((1, size * 2 + 1, size * 2 + 1)),
s.reshape((1, size * 2 + 1, size * 2 + 1)),
v.reshape((1, size * 2 + 1, size * 2 + 1))), axis=0)
rgb_dat = hsv_to_rgb(hsv_dat.transpose((1, 2, 0))).transpose((2, 0, 1))
a = np.ones((1, size * 2 + 1, size * 2 + 1), dtype=np.float32)
rgb_dat[:, flow_scale > max_flow_scale] = 0
a[:, flow_scale > max_flow_scale] = 0
rgba_dat = np.concatenate((rgb_dat, a), axis=0)
return rgba_dat
def save_hko_gif(im_dat, save_path):
"""Save the HKO images to gif
Parameters
----------
im_dat : np.ndarray
Shape: (seqlen, H, W)
save_path : str
Returns
-------
"""
assert im_dat.ndim == 3
save_gif(im_dat, fname=save_path)
return
def merge_rgba_cv2(front_img, back_img):
"""Merge the front image with the background image using the `Painter's algorithm`
Parameters
----------
front_img : np.ndarray
back_img : np.ndarray
Returns
-------
result_img : np.ndarray
"""
assert front_img.shape == back_img.shape
if front_img.dtype == np.uint8:
front_img = front_img.astype(np.float32) / 255.0
if back_img.dtype == np.uint8:
back_img = back_img.astype(np.float32) / 255.0
result_img = np.zeros(front_img.shape, dtype=np.float32)
result_img[:, :, 3] = front_img[:, :, 3] + back_img[:, :, 3] * (1 - front_img[:, :, 3])
result_img[:, :, :3] = (front_img[:, :, :3] * front_img[:, :, 3:] +
back_img[:, :, :3] * back_img[:, :, 3:] * (1 - front_img[:, :, 3:])) /\
result_img[:, :, 3:]
result_img = (result_img * 255.0).astype(np.uint8)
return result_img
def save_hko_movie(im_dat, datetime_list, mask_dat=None, save_path="hko.mp4", masked=False,
fps=5, prediction_start=None):
"""Save the HKO images to a video file
Parameters
----------
im_dat : np.ndarray
Shape : (seq_len, H, W)
datetime_list : list
list of datetimes
mask_dat : np.ndarray or None
Shape : (seq_len, H, W)
save_path : str
masked : bool
whether the mask the inputs when saving the image
fps : float
the fps of the saved movie
prediction_start : int or None
The starting point of the prediction
"""
from nowcasting.config import cfg
central_region = cfg.HKO.EVALUATION.CENTRAL_REGION
seq_len, height, width = im_dat.shape
display_im_dat = []
mask_color = np.array((0, 170, 160, 150), dtype=np.float32) / 255.0
if im_dat.dtype == np.float32:
im_dat = (im_dat * 255).astype(np.uint8)
for i in range(im_dat.shape[0]):
if not masked:
color_im_dat = cv2.cvtColor(im_dat[i], cv2.COLOR_GRAY2RGBA)
im = color_im_dat
else:
im = im_dat[i] * mask_dat[i]
im = cv2.cvtColor(im, cv2.COLOR_GRAY2RGBA)
# Uncomment the following code to add transparency to the masks
# color_im_dat = cv2.cvtColor(im_dat[i], cv2.COLOR_GRAY2RGBA)
# mask_im_dat = mask_color.reshape((1, 1, 4)) * np.expand_dims(1 - mask_dat[i], axis=2)
# im = merge_rgba_cv2(front_img=mask_im_dat, back_img=color_im_dat)
if prediction_start is not None and i >= prediction_start:
cv2.putText(im, text=datetime_list[i].strftime('%Y/%m/%d %H:%M'),
org=(0, 20), fontFace=cv2.FONT_HERSHEY_SIMPLEX, fontScale=0.4,
color=(255, 0, 0, 0))
else:
cv2.putText(im, text=datetime_list[i].strftime('%Y/%m/%d %H:%M'),
org=(0, 20), fontFace=cv2.FONT_HERSHEY_SIMPLEX, fontScale=0.4,
color=(255, 255, 255, 0))
cv2.rectangle(im,
pt1=(central_region[0], central_region[1]),
pt2=(central_region[2], central_region[3]),
color=(0, 255, 0, 0))
display_im_dat.append(im)
clip = mpy.ImageSequenceClip(display_im_dat, with_mask=False, fps=fps)
clip.write_videofile(save_path, audio=False, verbose=False, threads=4)