Spaces:
Runtime error
Runtime error
| # Copyright 2017 The TensorFlow Authors All Rights Reserved. | |
| # | |
| # Licensed under the Apache License, Version 2.0 (the "License"); | |
| # you may not use this file except in compliance with the License. | |
| # You may obtain a copy of the License at | |
| # | |
| # http://www.apache.org/licenses/LICENSE-2.0 | |
| # | |
| # Unless required by applicable law or agreed to in writing, software | |
| # distributed under the License is distributed on an "AS IS" BASIS, | |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
| # See the License for the specific language governing permissions and | |
| # limitations under the License. | |
| # ============================================================================== | |
| """Basic blocks for building tensorflow models.""" | |
| import numpy as np | |
| import tensorflow as tf | |
| import block_base | |
| import block_util | |
| # pylint does not recognize block_base.BlockBase.__call__(). | |
| # pylint: disable=not-callable | |
| def HandleConvPaddingModes(x, padding, kernel_shape, strides): | |
| """Returns an updated tensor and padding type for REFLECT and SYMMETRIC. | |
| Args: | |
| x: A 4D tensor with shape [batch_size, height, width, depth]. | |
| padding: Padding mode (SAME, VALID, REFLECT, or SYMMETRIC). | |
| kernel_shape: Shape of convolution kernel that will be applied. | |
| strides: Convolution stride that will be used. | |
| Returns: | |
| x and padding after adjustments for REFLECT and SYMMETRIC. | |
| """ | |
| # For 1x1 convolution, all padding modes are the same. | |
| if np.all(kernel_shape[:2] == 1): | |
| return x, 'VALID' | |
| if padding == 'REFLECT' or padding == 'SYMMETRIC': | |
| # We manually compute the number of paddings as if 'SAME'. | |
| # From Tensorflow kernel, the formulas are as follows. | |
| # output_shape = ceil(input_shape / strides) | |
| # paddings = (output_shape - 1) * strides + filter_size - input_shape | |
| # Let x, y, s be a shorthand notations for input_shape, output_shape, and | |
| # strides, respectively. Let (x - 1) = sn + r where 0 <= r < s. Note that | |
| # y - 1 = ceil(x / s) - 1 = floor((x - 1) / s) = n | |
| # provided that x > 0. Therefore | |
| # paddings = n * s + filter_size - (sn + r + 1) | |
| # = filter_size - r - 1. | |
| input_shape = x.get_shape() # shape at graph construction time | |
| img_shape = tf.shape(x)[1:3] # image shape (no batch) at run time | |
| remainder = tf.mod(img_shape - 1, strides[1:3]) | |
| pad_sizes = kernel_shape[:2] - remainder - 1 | |
| pad_rows = pad_sizes[0] | |
| pad_cols = pad_sizes[1] | |
| pad = tf.stack([[0, 0], tf.stack([pad_rows // 2, (pad_rows + 1) // 2]), | |
| tf.stack([pad_cols // 2, (pad_cols + 1) // 2]), [0, 0]]) | |
| # Manually pad the input and switch the padding mode to 'VALID'. | |
| x = tf.pad(x, pad, mode=padding) | |
| x.set_shape([input_shape[0], x.get_shape()[1], | |
| x.get_shape()[2], input_shape[3]]) | |
| padding = 'VALID' | |
| return x, padding | |
| class PassThrough(block_base.BlockBase): | |
| """A dummy transform block that does nothing.""" | |
| def __init__(self): | |
| # Pass an empty string to disable name scoping. | |
| super(PassThrough, self).__init__(name='') | |
| def _Apply(self, inp): | |
| return inp | |
| def initialized(self): | |
| """Always returns True.""" | |
| return True | |
| class Bias(object): | |
| """An initialization helper class for BiasAdd block below.""" | |
| def __init__(self, value=0): | |
| self.value = value | |
| class BiasAdd(block_base.BlockBase): | |
| """A tf.nn.bias_add wrapper. | |
| This wrapper may act as a PassThrough block depending on the initializer | |
| provided, to make easier optional bias applications in NN blocks, etc. | |
| See __init__() for the details. | |
| """ | |
| def __init__(self, initializer=Bias(0), name=None): | |
| """Initializes Bias block. | |
| |initializer| parameter have two special cases. | |
| 1. If initializer is None, then this block works as a PassThrough. | |
| 2. If initializer is a Bias class object, then tf.constant_initializer is | |
| used with the stored value. | |
| Args: | |
| initializer: An initializer for the bias variable. | |
| name: Name of this block. | |
| """ | |
| super(BiasAdd, self).__init__(name) | |
| with self._BlockScope(): | |
| if isinstance(initializer, Bias): | |
| self._initializer = tf.constant_initializer(value=initializer.value) | |
| else: | |
| self._initializer = initializer | |
| self._bias = None | |
| def _Apply(self, x): | |
| if not self._bias: | |
| init = self._initializer([int(x.get_shape()[-1])], x.dtype) | |
| self._bias = self.NewVar(init) | |
| return tf.nn.bias_add(x, self._bias) | |
| def CreateWeightLoss(self): | |
| return [] | |
| class LinearBase(block_base.BlockBase): | |
| """A matmul wrapper. | |
| Returns input * W, where matrix W can be customized through derivation. | |
| """ | |
| def __init__(self, depth, name=None): | |
| super(LinearBase, self).__init__(name) | |
| with self._BlockScope(): | |
| self._depth = depth | |
| self._matrix = None | |
| def _CreateKernel(self, shape, dtype): | |
| raise NotImplementedError('This method must be sub-classed.') | |
| def _Apply(self, x): | |
| if not self._matrix: | |
| shape = [int(x.get_shape()[-1]), self._depth] | |
| self._matrix = self._CreateKernel(shape, x.dtype) | |
| return tf.matmul(x, self._matrix) | |
| class Linear(LinearBase): | |
| """A matmul wrapper. | |
| Returns input * W, where matrix W is learned. | |
| """ | |
| def __init__(self, | |
| depth, | |
| initializer=block_util.RsqrtInitializer(), | |
| name=None): | |
| super(Linear, self).__init__(depth, name) | |
| with self._BlockScope(): | |
| self._initializer = initializer | |
| def _CreateKernel(self, shape, dtype): | |
| init = self._initializer(shape, dtype) | |
| return self.NewVar(init) | |
| class NN(block_base.BlockBase): | |
| """A neural network layer wrapper. | |
| Returns act(input * W + b), where matrix W, bias b are learned, and act is an | |
| optional activation function (i.e., nonlinearity). | |
| This transform block can handle multiple inputs. If x_1, x_2, ..., x_m are | |
| the inputs, then returns act(x_1 * W_1 + ... + x_m * W_m + b). | |
| Attributes: | |
| nunits: The dimension of the output. | |
| """ | |
| def __init__(self, | |
| depth, | |
| bias=Bias(0), | |
| act=None, # e.g., tf.nn.relu | |
| initializer=block_util.RsqrtInitializer(), | |
| linear_block_factory=(lambda d, i: Linear(d, initializer=i)), | |
| name=None): | |
| """Initializes NN block. | |
| Args: | |
| depth: The depth of the output. | |
| bias: An initializer for the bias, or a Bias class object. If None, there | |
| will be no bias term for this NN block. See BiasAdd block. | |
| act: Optional activation function. If None, no activation is applied. | |
| initializer: The initialization method for the matrix weights. | |
| linear_block_factory: A function used to create a linear block. | |
| name: The name of this block. | |
| """ | |
| super(NN, self).__init__(name) | |
| with self._BlockScope(): | |
| self._linear_block_factory = linear_block_factory | |
| self._depth = depth | |
| self._initializer = initializer | |
| self._matrices = None | |
| self._bias = BiasAdd(bias) if bias else PassThrough() | |
| self._act = act if act else PassThrough() | |
| def _Apply(self, *args): | |
| if not self._matrices: | |
| self._matrices = [ | |
| self._linear_block_factory(self._depth, self._initializer) | |
| for _ in args] | |
| if len(self._matrices) != len(args): | |
| raise ValueError('{} expected {} inputs, but observed {} inputs'.format( | |
| self.name, len(self._matrices), len(args))) | |
| if len(args) > 1: | |
| y = tf.add_n([m(x) for m, x in zip(self._matrices, args)]) | |
| else: | |
| y = self._matrices[0](args[0]) | |
| return self._act(self._bias(y)) | |
| class Conv2DBase(block_base.BlockBase): | |
| """A tf.nn.conv2d operator.""" | |
| def __init__(self, depth, filter_size, strides, padding, | |
| bias=None, act=None, atrous_rate=None, conv=tf.nn.conv2d, | |
| name=None): | |
| """Initializes a Conv2DBase block. | |
| Arguments: | |
| depth: The output depth of the block (i.e. #filters); if negative, the | |
| output depth will be set to be the same as the input depth. | |
| filter_size: The size of the 2D filter. If it's specified as an integer, | |
| it's going to create a square filter. Otherwise, this is a tuple | |
| specifying the height x width of the filter. | |
| strides: A tuple specifying the y and x stride. | |
| padding: One of the valid padding modes allowed by tf.nn.conv2d, or | |
| 'REFLECT'/'SYMMETRIC' for mirror padding. | |
| bias: An initializer for the bias, or a Bias class object. If None, there | |
| will be no bias in this block. See BiasAdd block. | |
| act: Optional activation function applied to the output. | |
| atrous_rate: optional input rate for ATrous convolution. If not None, this | |
| will be used and the strides will be ignored. | |
| conv: The convolution function to use (e.g. tf.nn.conv2d). | |
| name: The name for this conv2d op. | |
| """ | |
| super(Conv2DBase, self).__init__(name) | |
| with self._BlockScope(): | |
| self._act = act if act else PassThrough() | |
| self._bias = BiasAdd(bias) if bias else PassThrough() | |
| self._kernel_shape = np.zeros((4,), dtype=np.int32) | |
| self._kernel_shape[:2] = filter_size | |
| self._kernel_shape[3] = depth | |
| self._strides = np.ones((4,), dtype=np.int32) | |
| self._strides[1:3] = strides | |
| self._strides = list(self._strides) | |
| self._padding = padding | |
| self._kernel = None | |
| self._conv = conv | |
| self._atrous_rate = atrous_rate | |
| def _CreateKernel(self, shape, dtype): | |
| raise NotImplementedError('This method must be sub-classed') | |
| def _Apply(self, x): | |
| """Apply the self._conv op. | |
| Arguments: | |
| x: input tensor. It needs to be a 4D tensor of the form | |
| [batch, height, width, channels]. | |
| Returns: | |
| The output of the convolution of x with the current convolutional | |
| kernel. | |
| Raises: | |
| ValueError: if number of channels is not defined at graph construction. | |
| """ | |
| input_shape = x.get_shape().with_rank(4) | |
| input_shape[3:].assert_is_fully_defined() # channels must be defined | |
| if self._kernel is None: | |
| assert self._kernel_shape[2] == 0, self._kernel_shape | |
| self._kernel_shape[2] = input_shape[3].value | |
| if self._kernel_shape[3] < 0: | |
| # Make output depth be the same as input depth. | |
| self._kernel_shape[3] = self._kernel_shape[2] | |
| self._kernel = self._CreateKernel(self._kernel_shape, x.dtype) | |
| x, padding = HandleConvPaddingModes( | |
| x, self._padding, self._kernel_shape, self._strides) | |
| if self._atrous_rate is None: | |
| x = self._conv(x, self._kernel, strides=self._strides, padding=padding) | |
| else: | |
| x = self._conv(x, self._kernel, rate=self._atrous_rate, padding=padding) | |
| if self._padding != 'VALID': | |
| # Manually update shape. Known shape information can be lost by tf.pad(). | |
| height = (1 + (input_shape[1].value - 1) // self._strides[1] | |
| if input_shape[1].value else None) | |
| width = (1 + (input_shape[2].value - 1) // self._strides[2] | |
| if input_shape[2].value else None) | |
| shape = x.get_shape() | |
| x.set_shape([shape[0], height, width, shape[3]]) | |
| return self._act(self._bias(x)) | |
| class Conv2D(Conv2DBase): | |
| """A tf.nn.conv2d operator.""" | |
| def __init__(self, depth, filter_size, strides, padding, | |
| bias=None, act=None, initializer=None, name=None): | |
| """Initializes a Conv2D block. | |
| Arguments: | |
| depth: The output depth of the block (i.e., #filters) | |
| filter_size: The size of the 2D filter. If it's specified as an integer, | |
| it's going to create a square filter. Otherwise, this is a tuple | |
| specifying the height x width of the filter. | |
| strides: A tuple specifying the y and x stride. | |
| padding: One of the valid padding modes allowed by tf.nn.conv2d, or | |
| 'REFLECT'/'SYMMETRIC' for mirror padding. | |
| bias: An initializer for the bias, or a Bias class object. If None, there | |
| will be no bias in this block. See BiasAdd block. | |
| act: Optional activation function applied to the output. | |
| initializer: Optional initializer for weights. | |
| name: The name for this conv2d op. | |
| """ | |
| super(Conv2D, self).__init__(depth, filter_size, strides, padding, bias, | |
| act, conv=tf.nn.conv2d, name=name) | |
| with self._BlockScope(): | |
| if initializer is None: | |
| initializer = block_util.RsqrtInitializer(dims=(0, 1, 2)) | |
| self._initializer = initializer | |
| def _CreateKernel(self, shape, dtype): | |
| return self.NewVar(self._initializer(shape, dtype)) | |