| |
|
| | import cv2
|
| | import numpy as np
|
| | import os
|
| |
|
| |
|
| | def flowread(flow_path, quantize=False, concat_axis=0, *args, **kwargs):
|
| | """Read an optical flow map.
|
| |
|
| | Args:
|
| | flow_path (ndarray or str): Flow path.
|
| | quantize (bool): whether to read quantized pair, if set to True,
|
| | remaining args will be passed to :func:`dequantize_flow`.
|
| | concat_axis (int): The axis that dx and dy are concatenated,
|
| | can be either 0 or 1. Ignored if quantize is False.
|
| |
|
| | Returns:
|
| | ndarray: Optical flow represented as a (h, w, 2) numpy array
|
| | """
|
| | if quantize:
|
| | assert concat_axis in [0, 1]
|
| | cat_flow = cv2.imread(flow_path, cv2.IMREAD_UNCHANGED)
|
| | if cat_flow.ndim != 2:
|
| | raise IOError(f'{flow_path} is not a valid quantized flow file, '
|
| | f'its dimension is {cat_flow.ndim}.')
|
| | assert cat_flow.shape[concat_axis] % 2 == 0
|
| | dx, dy = np.split(cat_flow, 2, axis=concat_axis)
|
| | flow = dequantize_flow(dx, dy, *args, **kwargs)
|
| | else:
|
| | with open(flow_path, 'rb') as f:
|
| | try:
|
| | header = f.read(4).decode('utf-8')
|
| | except Exception:
|
| | raise IOError(f'Invalid flow file: {flow_path}')
|
| | else:
|
| | if header != 'PIEH':
|
| | raise IOError(f'Invalid flow file: {flow_path}, '
|
| | 'header does not contain PIEH')
|
| |
|
| | w = np.fromfile(f, np.int32, 1).squeeze()
|
| | h = np.fromfile(f, np.int32, 1).squeeze()
|
| | flow = np.fromfile(f, np.float32, w * h * 2).reshape((h, w, 2))
|
| |
|
| | return flow.astype(np.float32)
|
| |
|
| |
|
| | def flowwrite(flow, filename, quantize=False, concat_axis=0, *args, **kwargs):
|
| | """Write optical flow to file.
|
| |
|
| | If the flow is not quantized, it will be saved as a .flo file losslessly,
|
| | otherwise a jpeg image which is lossy but of much smaller size. (dx and dy
|
| | will be concatenated horizontally into a single image if quantize is True.)
|
| |
|
| | Args:
|
| | flow (ndarray): (h, w, 2) array of optical flow.
|
| | filename (str): Output filepath.
|
| | quantize (bool): Whether to quantize the flow and save it to 2 jpeg
|
| | images. If set to True, remaining args will be passed to
|
| | :func:`quantize_flow`.
|
| | concat_axis (int): The axis that dx and dy are concatenated,
|
| | can be either 0 or 1. Ignored if quantize is False.
|
| | """
|
| | if not quantize:
|
| | with open(filename, 'wb') as f:
|
| | f.write('PIEH'.encode('utf-8'))
|
| | np.array([flow.shape[1], flow.shape[0]], dtype=np.int32).tofile(f)
|
| | flow = flow.astype(np.float32)
|
| | flow.tofile(f)
|
| | f.flush()
|
| | else:
|
| | assert concat_axis in [0, 1]
|
| | dx, dy = quantize_flow(flow, *args, **kwargs)
|
| | dxdy = np.concatenate((dx, dy), axis=concat_axis)
|
| | os.makedirs(filename, exist_ok=True)
|
| | cv2.imwrite(dxdy, filename)
|
| |
|
| |
|
| | def quantize_flow(flow, max_val=0.02, norm=True):
|
| | """Quantize flow to [0, 255].
|
| |
|
| | After this step, the size of flow will be much smaller, and can be
|
| | dumped as jpeg images.
|
| |
|
| | Args:
|
| | flow (ndarray): (h, w, 2) array of optical flow.
|
| | max_val (float): Maximum value of flow, values beyond
|
| | [-max_val, max_val] will be truncated.
|
| | norm (bool): Whether to divide flow values by image width/height.
|
| |
|
| | Returns:
|
| | tuple[ndarray]: Quantized dx and dy.
|
| | """
|
| | h, w, _ = flow.shape
|
| | dx = flow[..., 0]
|
| | dy = flow[..., 1]
|
| | if norm:
|
| | dx = dx / w
|
| | dy = dy / h
|
| |
|
| | flow_comps = [
|
| | quantize(d, -max_val, max_val, 255, np.uint8) for d in [dx, dy]
|
| | ]
|
| | return tuple(flow_comps)
|
| |
|
| |
|
| | def dequantize_flow(dx, dy, max_val=0.02, denorm=True):
|
| | """Recover from quantized flow.
|
| |
|
| | Args:
|
| | dx (ndarray): Quantized dx.
|
| | dy (ndarray): Quantized dy.
|
| | max_val (float): Maximum value used when quantizing.
|
| | denorm (bool): Whether to multiply flow values with width/height.
|
| |
|
| | Returns:
|
| | ndarray: Dequantized flow.
|
| | """
|
| | assert dx.shape == dy.shape
|
| | assert dx.ndim == 2 or (dx.ndim == 3 and dx.shape[-1] == 1)
|
| |
|
| | dx, dy = [dequantize(d, -max_val, max_val, 255) for d in [dx, dy]]
|
| |
|
| | if denorm:
|
| | dx *= dx.shape[1]
|
| | dy *= dx.shape[0]
|
| | flow = np.dstack((dx, dy))
|
| | return flow
|
| |
|
| |
|
| | def quantize(arr, min_val, max_val, levels, dtype=np.int64):
|
| | """Quantize an array of (-inf, inf) to [0, levels-1].
|
| |
|
| | Args:
|
| | arr (ndarray): Input array.
|
| | min_val (scalar): Minimum value to be clipped.
|
| | max_val (scalar): Maximum value to be clipped.
|
| | levels (int): Quantization levels.
|
| | dtype (np.type): The type of the quantized array.
|
| |
|
| | Returns:
|
| | tuple: Quantized array.
|
| | """
|
| | if not (isinstance(levels, int) and levels > 1):
|
| | raise ValueError(
|
| | f'levels must be a positive integer, but got {levels}')
|
| | if min_val >= max_val:
|
| | raise ValueError(
|
| | f'min_val ({min_val}) must be smaller than max_val ({max_val})')
|
| |
|
| | arr = np.clip(arr, min_val, max_val) - min_val
|
| | quantized_arr = np.minimum(
|
| | np.floor(levels * arr / (max_val - min_val)).astype(dtype), levels - 1)
|
| |
|
| | return quantized_arr
|
| |
|
| |
|
| | def dequantize(arr, min_val, max_val, levels, dtype=np.float64):
|
| | """Dequantize an array.
|
| |
|
| | Args:
|
| | arr (ndarray): Input array.
|
| | min_val (scalar): Minimum value to be clipped.
|
| | max_val (scalar): Maximum value to be clipped.
|
| | levels (int): Quantization levels.
|
| | dtype (np.type): The type of the dequantized array.
|
| |
|
| | Returns:
|
| | tuple: Dequantized array.
|
| | """
|
| | if not (isinstance(levels, int) and levels > 1):
|
| | raise ValueError(
|
| | f'levels must be a positive integer, but got {levels}')
|
| | if min_val >= max_val:
|
| | raise ValueError(
|
| | f'min_val ({min_val}) must be smaller than max_val ({max_val})')
|
| |
|
| | dequantized_arr = (arr + 0.5).astype(dtype) * (max_val -
|
| | min_val) / levels + min_val
|
| |
|
| | return dequantized_arr
|
| |
|