STLDM_official / nowcasting /movingmnist_iterator.py
sqfoo's picture
Upload 99 files
6021dd1 verified
import logging
import cv2
import numpy as np
import math
import os
from nowcasting.config import cfg
logger = logging.getLogger(__name__)
def load_mnist(training_num=50000):
"""Load the mnist dataset
Parameters
----------
training_num
Returns
-------
"""
data_path = os.path.join(cfg.MNIST_PATH, "mnist.npz")
if not os.path.isfile(data_path):
origin = (
'https://github.com/sxjscience/mxnet/raw/master/example/bayesian-methods/mnist.npz'
)
print('Downloading data from %s to %s' % (origin, data_path))
from urllib import request
import ssl
ssl._create_default_https_context = ssl._create_unverified_context # Not verify
data_file = request.urlopen(origin)
with open(data_path, 'wb') as output:
output.write(data_file.read())
print('Done!')
dat = np.load(data_path)
X = dat['X'][:training_num]
Y = dat['Y'][:training_num]
X_test = dat['X_test']
Y_test = dat['Y_test']
Y = Y.reshape((Y.shape[0],))
Y_test = Y_test.reshape((Y_test.shape[0],))
return X, Y, X_test, Y_test
def move_step(v0, p0, bounding_box):
xmin, xmax, ymin, ymax = bounding_box
assert (p0[0] >= xmin) and (p0[0] <= xmax) and (p0[1] >= ymin) and (p0[1] <= ymax)
v = v0.copy()
assert v[0] != 0.0 and v[1] != 0.0
p = v0 + p0
while (p[0] < xmin) or (p[0] > xmax) or (p[1] < ymin) or (p[1] > ymax):
vx, vy = v
x, y = p
dist = np.zeros((4,))
dist[0] = abs(x - xmin) if ymin <= (xmin - x) * vy / vx + y <= ymax else np.inf
dist[1] = abs(x - xmax) if ymin <= (xmax - x) * vy / vx + y <= ymax else np.inf
dist[2] = abs((y - ymin) * vx / vy) if xmin <= (ymin - y) * vx / vy + x <= xmax else np.inf
dist[3] = abs((y - ymax) * vx / vy) if xmin <= (ymax - y) * vx / vy + x <= xmax else np.inf
n = np.argmin(dist)
if n == 0:
v[0] = -v[0]
p[0] = 2 * xmin - p[0]
elif n == 1:
v[0] = -v[0]
p[0] = 2 * xmax - p[0]
elif n == 2:
v[1] = -v[1]
p[1] = 2 * ymin - p[1]
elif n == 3:
v[1] = -v[1]
p[1] = 2 * ymax - p[1]
else:
assert False
return v, p
def crop_mnist_digit(digit_img, tol=5):
"""Return the cropped version of the mnist digit
Parameters
----------
digit_img : np.ndarray
Shape: ()
Returns
-------
"""
tol = float(tol) / float(255)
mask = digit_img > tol
return digit_img[np.ix_(mask.any(1), mask.any(0))]
class MovingMNISTIterator(object):
def __init__(self):
self.mnist_train_img, self.mnist_train_label,\
self.mnist_test_img, self.mnist_test_label = load_mnist()
def sample(self, digitnum=3, width=64, height=64, seqlen=20, lower=3.0, upper=5.0,
batch_size=32, index_range=(0, 50000)):
"""
Parameters
----------
digitnum
width
height
seqlen
batch_size
index_range
Returns
-------
seq : np.NDArray
Shape: (seqlen, batch_size, 1, width, height)
"""
character_indices = np.random.randint(low=index_range[0], high=index_range[1],
size=(batch_size, digitnum))
angles = np.random.random((batch_size, digitnum)) * (2 * np.pi)
magnitudes = np.random.random((batch_size, digitnum)) * (upper - lower) + lower
velocities = np.zeros((batch_size, digitnum, 2), dtype='float32')
velocities[..., 0] = magnitudes * np.cos(angles)
velocities[..., 1] = magnitudes * np.sin(angles)
xmin = 14.0
xmax = float(width) - 14.0
ymin = 14.0
ymax = float(height) - 14.0
positions = np.random.uniform(low=xmin, high=xmax,
size=(batch_size, digitnum, 2))
seq = np.zeros((seqlen, batch_size, 1, height, width), dtype='uint8')
for i in range(batch_size):
for j in range(digitnum):
ind = character_indices[i, j]
v = velocities[i, j, :]
p = positions[i, j, :]
img = self.mnist_train_img[ind].reshape((28, 28))
for k in range(seqlen):
topleft_y = int(p[0] - img.shape[0] / 2)
topleft_x = int(p[1] - img.shape[1] / 2)
seq[k, i, 0, topleft_y:topleft_y + 28, topleft_x:topleft_x + 28] = np.maximum(
seq[k, i, 0, topleft_y:topleft_y + 28, topleft_x:topleft_x + 28],
img)
v, p = move_step(v, p, [xmin, xmax, ymin, ymax])
return seq
class MovingMNISTAdvancedIterator(object):
def __init__(self,
digit_num=None,
distractor_num=None,
img_size=None,
distractor_size=5,
max_velocity_scale=3.6,
initial_velocity_range=(0.0, 3.6),
acceleration_range=(0.0, 0.0),
scale_variation_range=(1 / 1.1, 1.1),
rotation_angle_range=(-30, 30),
global_rotation_angle_range=(-30, 30),
illumination_factor_range=(0.6, 1.0),
period=5,
global_rotation_prob=0.5,
index_range=(0, 40000)):
"""
Parameters
----------
digit_num : int
Number of digits
distractor_num : int
Number of distractors
img_size : int
Size of the image
distractor_size : int
Size of the distractors
max_velocity_scale : float
Maximum scale of the velocity
initial_velocity_range : tuple
acceleration_range
scale_variation_range
rotation_angle_range
period : period of the
index_range
"""
self.mnist_train_img, self.mnist_train_label,\
self.mnist_test_img, self.mnist_test_label = load_mnist()
self._digit_num = digit_num if digit_num is not None else cfg.MOVINGMNIST.DIGIT_NUM
self._img_size = img_size if img_size is not None else cfg.MOVINGMNIST.IMG_SIZE
self._distractor_size = distractor_size
self._distractor_num = distractor_num if distractor_num is not None else cfg.MOVINGMNIST.DISTRACTOR_NUM
self._max_velocity_scale = max_velocity_scale
self._initial_velocity_range = initial_velocity_range
self._acceleration_range = acceleration_range
self._scale_variation_range = scale_variation_range
self._rotation_angle_range = rotation_angle_range
self._illumination_factor_range = illumination_factor_range
self._period = period
self._global_rotation_angle_range = global_rotation_angle_range
self._global_rotation_prob = global_rotation_prob
self._index_range = index_range
self._h5py_f = None
self._seq = None
self._motion_vectors = None
self.replay = None
self.replay_index = 0
self.replay_numsamples = -1
def _choose_distractors(self, distractor_seeds):
"""Choose the distractors
We use the similar approach as
https://github.com/deepmind/mnist-cluttered/blob/master/mnist_cluttered.lua
Returns
-------
ret : list
list of distractor images
"""
ret = []
for i in range(self._distractor_num):
ind = math.floor(distractor_seeds[i, 2] * self._index_range[1])
distractor_img = self.mnist_train_img[ind].reshape((28, 28))
distractor_h_begin = math.floor(distractor_seeds[i, 3] * (28 - self._distractor_size))
distractor_w_begin = math.floor(distractor_seeds[i, 4] * (28 - self._distractor_size))
distractor_img = distractor_img[
distractor_h_begin:distractor_h_begin + self._distractor_size,
distractor_w_begin:distractor_w_begin + self._distractor_size]
ret.append(distractor_img)
return ret
def draw_distractors(self, canvas_img, distractor_seeds):
"""
Parameters
----------
canvas_img
Returns
-------
"""
distractor_imgs = self._choose_distractors(distractor_seeds)
for i, img in enumerate(distractor_imgs):
r_begin = math.floor(distractor_seeds[i][0] * (self._img_size - img.shape[0]))
c_begin = math.floor(distractor_seeds[i][1] * (self._img_size - img.shape[1]))
canvas_img[r_begin:r_begin + img.shape[0], c_begin:c_begin +
img.shape[1]] = img
return canvas_img
def draw_imgs(self,
base_img,
affine_transforms,
prev_affine_transforms=None):
"""
Parameters
----------
base_img : list
Inner Shape: (H, W)
affine_transforms : np.ndarray
Shape: (digit_num, 2, 3)
prev_affine_transforms : np.ndarray
Shape: (digit_num, 2, 3)
Returns
-------
"""
canvas_img = np.zeros(
(self._img_size, self._img_size), dtype=np.float32)
for i in range(self._digit_num):
tmp_img = cv2.warpAffine(base_img[i], affine_transforms[i],
(self._img_size, self._img_size))
canvas_img = np.maximum(canvas_img, tmp_img)
return canvas_img
def _find_center(self, img):
x, y = np.meshgrid(np.arange(img.shape[0]), np.arange(img.shape[1]))
raise NotImplementedError
def _bounce_border(self, inner_boundary, affine_transform, digit_shift,
velocity, img_h, img_w):
# top-left, top-right, down-left, down-right
center = affine_transform.dot(
np.array([img_w / 2.0, img_h / 2.0, 1], dtype=np.float32))
new_velocity = velocity.copy()
new_center = center.copy()
if center[0] < inner_boundary[0]:
new_velocity[0] = -new_velocity[0]
new_center[0] = inner_boundary[0]
if center[0] > inner_boundary[2]:
new_velocity[0] = -new_velocity[0]
new_center[0] = inner_boundary[2]
if center[1] < inner_boundary[1]:
new_velocity[1] = -new_velocity[1]
new_center[1] = inner_boundary[1]
if center[1] > inner_boundary[3]:
new_velocity[1] = -new_velocity[1]
new_center[1] = inner_boundary[3]
affine_transform[:, 2] += new_center - center
digit_shift += new_center - center
return affine_transform, digit_shift, new_velocity
def sample(self, batch_size, seqlen, random=True):
"""
Parameters
----------
batch_size : int
seqlen : int
random: take random samples from loaded parameters. Ignored if no parameters are loaded.
Returns
-------
seq : np.ndarray
Shape: (seqlen, batch_size, 1, H, W)
motion_vectors : np.ndarray
Shape: (seqlen, batch_size, 2, H, W)
"""
if self.replay is not None:
if random is True:
self.replay_index = np.random.randint(self.replay_numsamples - batch_size)
elif self.replay_index + batch_size > self.replay_numsamples:
raise IndexError("Not enough pre-generated parameters to create new sample.")
seq = np.zeros(
(seqlen, batch_size, 1, self._img_size, self._img_size),
dtype=np.float32)
motion_vectors = np.zeros(
(seqlen, batch_size, 2, self._img_size, self._img_size),
dtype=np.float32)
inner_boundary = np.array(
[10, 10, self._img_size - 10, self._img_size - 10],
dtype=np.float32)
for b in range(batch_size):
affine_transforms = np.zeros(
(seqlen, self._digit_num, 2, 3), dtype=np.float32)
appearance_variants = np.ones(
(seqlen, self._digit_num), dtype=np.float32)
scale = np.ones((seqlen, self._digit_num), dtype=np.float32)
rotation_angle = np.zeros(
(seqlen, self._digit_num), dtype=np.float32)
init_velocity = np.zeros(
shape=(self._digit_num, 2), dtype=np.float32)
velocity = np.zeros((seqlen, self._digit_num, 2), dtype=np.float32)
digit_shift = np.zeros(
(seqlen, self._digit_num, 2), dtype=np.float32)
if self.replay is not None:
digit_indices = self.replay["digit_indices"][self.replay_index
+ b]
appearance_mult = self.replay["appearance_mult"][
self.replay_index + b]
scale_variation = self.replay["scale_variation"][
self.replay_index + b]
base_rotation_angle = self.replay["base_rotation_angle"][
self.replay_index + b]
affine_transforms_multipliers = self.replay[
"affine_transforms_multipliers"][self.replay_index + b]
init_velocity_angle = self.replay["init_velocity_angle"][
self.replay_index + b]
init_velocity_magnitude = self.replay[
"init_velocity_magnitude"][self.replay_index + b]
distractor_seeds = self.replay[
"distractor_seeds"][self.replay_index + b]
assert(distractor_seeds.shape[0] == seqlen)
else:
digit_indices = np.random.randint(
low=self._index_range[0],
high=self._index_range[1],
size=self._digit_num)
appearance_mult = np.random.uniform(
low=self._illumination_factor_range[0],
high=self._illumination_factor_range[1])
scale_variation = np.random.uniform(
low=self._scale_variation_range[0],
high=self._scale_variation_range[1],
size=(self._digit_num, ))
base_rotation_angle = np.random.uniform(
low=self._rotation_angle_range[0],
high=self._rotation_angle_range[1],
size=(self._digit_num, ))
affine_transforms_multipliers = np.random.uniform(
size=(self._digit_num, 2))
init_velocity_angle = np.random.uniform(size=(
self._digit_num, )) * (2 * np.pi)
init_velocity_magnitude = np.random.uniform(
low=self._initial_velocity_range[0],
high=self._initial_velocity_range[1],
size=self._digit_num)
distractor_seeds = np.random.uniform(
size=(seqlen, self._distractor_num, 5))
base_digit_img = [
crop_mnist_digit(self.mnist_train_img[i].reshape((28, 28)))
for i in digit_indices
]
for i in range(1, seqlen):
appearance_variants[i, :] = appearance_variants[i - 1, :] *\
(appearance_mult ** -(2 * ((i // 5) % 2) - 1))
for i in range(1, seqlen):
base_factor = (2 * ((i // 5) % 2) - 1)
scale[i, :] = scale[i - 1, :] * (scale_variation**base_factor)
rotation_angle[i, :] = rotation_angle[
i - 1, :] + base_rotation_angle
affine_transforms[0, :, 0, 0] = 1.0
affine_transforms[0, :, 1, 1] = 1.0
for i in range(self._digit_num):
affine_transforms[0, i, 0, 2] = affine_transforms_multipliers[i, 0] *\
(self._img_size - base_digit_img[i].shape[1])
affine_transforms[0, i, 1, 2] = affine_transforms_multipliers[i, 1] *\
(self._img_size - base_digit_img[i].shape[0])
init_velocity[:, 0] = init_velocity_magnitude * np.cos(
init_velocity_angle)
init_velocity[:, 1] = init_velocity_magnitude * np.sin(
init_velocity_angle)
curr_velocity = init_velocity
# base_acceleration_angle = np.random.random() * 2 * np.pi
# base_acceleration_magnitude = np.random.uniform(low=self._acceleration_range[0],
# high=self._acceleration_range[1],
# size=self._digit_num)
# base_acceleration = np.zeros(shape=(self._digit_num, 2), dtype=np.float32)
# base_acceleration[:, 0] = base_acceleration_magnitude * np.cos(init_velocity_angle)
# base_acceleration[:, 1] = base_acceleration_magnitude * np.sin(init_velocity_angle)
for i in range(self._digit_num):
digit_shift[0, i, 0] = affine_transforms[
0, i, 0, 2] #+ (base_digit_img[i].shape[1] / 2.0)
digit_shift[0, i, 1] = affine_transforms[
0, i, 1, 2] #+ (base_digit_img[i].shape[0] / 2.0)
for i in range(seqlen - 1):
velocity[i, :, :] = curr_velocity
#curr_velocity += base_acceleration * (2 * ((i / 5) % 2) - 1)
curr_velocity = np.clip(
curr_velocity,
a_min=-self._max_velocity_scale,
a_max=self._max_velocity_scale)
for j in range(self._digit_num):
digit_shift[i + 1, j, :] = digit_shift[
i, j, :] + curr_velocity[j]
rotation_mat = cv2.getRotationMatrix2D(
center=(base_digit_img[j].shape[1] / 2.0,
base_digit_img[j].shape[0] / 2.0),
angle=rotation_angle[i + 1, j],
scale=scale[i + 1, j])
affine_transforms[i + 1, j, :, :2] = rotation_mat[:, :2]
affine_transforms[i + 1, j, :, 2] = digit_shift[
i + 1, j, :] + rotation_mat[:, 2]
affine_transforms[i + 1, j, :, :], digit_shift[i + 1, j, :], curr_velocity[j] =\
self._bounce_border(inner_boundary=inner_boundary,
affine_transform=affine_transforms[i + 1, j, :, :],
digit_shift=digit_shift[i + 1, j, :],
velocity=curr_velocity[j],
img_h=base_digit_img[j].shape[0],
img_w=base_digit_img[j].shape[1])
for i in range(seqlen):
seq[i, b, 0, :, :] = self.draw_imgs(
base_img=[
base_digit_img[j] * appearance_variants[i, j]
for j in range(self._digit_num)
],
affine_transforms=affine_transforms[i])
self.draw_distractors(seq[i, b, 0, :, :], distractor_seeds[i])
self.replay_index += batch_size
return seq, motion_vectors
def load(self, file):
"""Initialize to draw samples from pre-computed parameters.
Args:
file: Either the file name (string) or an open file (file-like
object) from which the data will be loaded.
"""
self.replay_index = 0
with np.load(file) as f:
self.replay = dict(f)
assert(self.replay["distractor_seeds"].shape[2] == self._distractor_num)
num_samples, seqlen = self.replay["distractor_seeds"].shape[0:2]
self.replay_numsamples = num_samples
return num_samples, seqlen
def save(self, seqlen, num_samples=10000, file=None):
"""Draw random numbers for num_samples sequences and save them.
This initializes the state of MovingMNISTAdvancedIterator to generate
sequences based on the hereby drawn parameters.
Note that each call to sample(batch_size, seqlen) will use batch_size
of the num_samples parameters.
Args:
num_samples: Number of unique MovingMNISTAdvanced sequences to draw
parameters for
file: Either the file name (string) or an open file (file-like
object) where the data will be saved. If file is a string or a
Path, the .npz extension will be appended to the file name if
it is not already there.
"""
if file is None:
file = "mnist_{}".format(num_samples)
self.replay = dict()
self.replay["digit_indices"] = np.random.randint(
low=self._index_range[0],
high=self._index_range[1],
size=(num_samples, self._digit_num))
self.replay["appearance_mult"] = np.random.uniform(
low=self._illumination_factor_range[0],
high=self._illumination_factor_range[1],
size=(num_samples, ))
self.replay["scale_variation"] = np.random.uniform(
low=self._scale_variation_range[0],
high=self._scale_variation_range[1],
size=(num_samples, self._digit_num))
self.replay["base_rotation_angle"] = np.random.uniform(
low=self._rotation_angle_range[0],
high=self._rotation_angle_range[1],
size=(num_samples, self._digit_num))
self.replay["affine_transforms_multipliers"] = np.random.uniform(
size=(num_samples, self._digit_num, 2))
self.replay["init_velocity_angle"] = np.random.uniform(
size=(num_samples, self._digit_num)) * 2 * np.pi
self.replay["init_velocity_magnitude"] = np.random.uniform(
low=self._initial_velocity_range[0],
high=self._initial_velocity_range[1],
size=(num_samples, self._digit_num))
self.replay["distractor_seeds"] = np.random.uniform(
size=(num_samples, seqlen, self._distractor_num, 5))
self.replay_numsamples = num_samples
np.savez_compressed(file=file, **self.replay)
if __name__ == '__main__':
from nowcasting.helpers.gifmaker import save_gif
import argparse
parser = argparse.ArgumentParser(
description='Generate sample from MovingMNISTAdvancedIterator gifs.')
parser.set_defaults(mode='test')
parser.add_argument(
'--no-distractors',
action='store_true',
help="Don't load/generate/use parameters for distractors.")
subparsers = parser.add_subparsers(help='Specify saving or loading mode.')
s = subparsers.add_parser('save', help='Generate a new dataset.')
s.add_argument(
'sequences', type=int, help="Number of sequences to generate.")
s.add_argument('length', type=int, help="Length of each sequence.")
s.add_argument(
'path', nargs='?', type=int, help="Path to the params file.")
s.set_defaults(mode='save')
l = subparsers.add_parser('load', help='Load an existing dataset.')
l.add_argument(
'path', nargs='?', type=int, help="Path to the params file.")
l.set_defaults(mode='load')
args = parser.parse_args()
distractor_num = 0 if args.no_distractors else 6
mnist_generator = MovingMNISTAdvancedIterator(
distractor_num=distractor_num)
batch_size = 1
if args.mode == 'test':
seqlen = 100
elif args.mode == 'save':
if args.path:
fname = args.path
else:
fname = "params.npz"
print("Generating {} sequences of length {}. Saving to {}.".format(
args.sequences, args.length, fname))
seqlen = args.length
mnist_generator.save(
seqlen=seqlen, num_samples=args.sequences, file=fname)
elif args.mode == 'load':
if args.path:
fname = args.path
else:
fname = "params.npz"
num_sequences, seqlen = mnist_generator.load(file=fname)
print("Loaded {} sequences of length {}. Saving to {}.".format(
num_sequences, seqlen, fname))
seq, _ = mnist_generator.sample(batch_size=batch_size, seqlen=seqlen)
print(seq.sum())
save_gif(seq[:, 0, 0, :, :].astype(np.float32) / 255.0, "test.gif")