codewraith / data /source_files /clean /03a9ee616ef3.py
slenk's picture
Upload folder using huggingface_hub
eeef81e verified
#
# Copyright (c) 2021 The GPflux Contributors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import abc
import numpy as np
import pytest
import tensorflow as tf
import tensorflow_probability as tfp
from gpflow.kullback_leiblers import gauss_kl
from gpflux.encoders import DirectlyParameterizedNormalDiag
from gpflux.layers import LatentVariableLayer, LayerWithObservations, TrackableLayer
tf.keras.backend.set_floatx("float64")
############
# Utilities
############
def _zero_one_normal_prior(w_dim):
""" N(0, I) prior """
return tfp.distributions.MultivariateNormalDiag(loc=np.zeros(w_dim), scale_diag=np.ones(w_dim))
def get_distributions_with_w_dim():
distributions = []
for d in [1, 5]:
mean = np.zeros(d)
scale_tri_l = np.eye(d)
mvn = tfp.distributions.MultivariateNormalTriL(mean, scale_tri_l)
std = np.ones(d)
mvn_diag = tfp.distributions.MultivariateNormalDiag(mean, std)
distributions.append((mvn, d))
distributions.append((mvn_diag, d))
return distributions
############
# Tests
############
@pytest.mark.parametrize("distribution, w_dim", get_distributions_with_w_dim())
def test_local_kls(distribution, w_dim):
lv = LatentVariableLayer(encoder=None, prior=distribution)
# test kl is 0 when posteriors == priors
posterior = distribution
assert lv._local_kls(posterior) == 0
# test kl > 0 when posteriors != priors
batch_size = 10
params = distribution.parameters
posterior_params = {
k: [v + 0.5 for _ in range(batch_size)]
for k, v in params.items()
if isinstance(v, np.ndarray)
}
posterior = lv.distribution_class(**posterior_params)
local_kls = lv._local_kls(posterior)
assert np.all(local_kls > 0)
assert local_kls.shape == (batch_size,)
@pytest.mark.parametrize("w_dim", [1, 5])
def test_local_kl_gpflow_consistency(w_dim):
num_data = 400
means = np.random.randn(num_data, w_dim)
encoder = DirectlyParameterizedNormalDiag(num_data, w_dim, means)
lv = LatentVariableLayer(encoder=encoder, prior=_zero_one_normal_prior(w_dim))
posteriors = lv._inference_posteriors(
[np.random.randn(num_data, 3), np.random.randn(num_data, 2)]
)
q_mu = posteriors.parameters["loc"]
q_sqrt = posteriors.parameters["scale_diag"]
gpflow_local_kls = gauss_kl(q_mu, q_sqrt)
tfp_local_kls = tf.reduce_sum(lv._local_kls(posteriors))
np.testing.assert_allclose(tfp_local_kls, gpflow_local_kls, rtol=1e-10)
class ArrayMatcher:
def __init__(self, expected):
self.expected = expected
def __eq__(self, actual):
return np.allclose(actual, self.expected, equal_nan=True)
@pytest.mark.parametrize("w_dim", [1, 5])
def test_latent_variable_layer_losses(mocker, w_dim):
num_data, x_dim, y_dim = 43, 3, 1
prior_shape = (w_dim,)
posteriors_shape = (num_data, w_dim)
prior = tfp.distributions.MultivariateNormalDiag(
loc=np.random.randn(*prior_shape),
scale_diag=np.random.randn(*prior_shape) ** 2,
)
posteriors = tfp.distributions.MultivariateNormalDiag(
loc=np.random.randn(*posteriors_shape),
scale_diag=np.random.randn(*posteriors_shape) ** 2,
)
encoder = mocker.Mock(return_value=(posteriors.loc, posteriors.scale.diag))
lv = LatentVariableLayer(encoder=encoder, prior=prior)
inputs = np.full((num_data, x_dim), np.nan)
targets = np.full((num_data, y_dim), np.nan)
observations = [inputs, targets]
encoder_inputs = np.concatenate(observations, axis=-1)
_ = lv(inputs)
encoder.assert_not_called()
assert lv.losses == [0.0]
_ = lv(inputs, observations=observations, training=True)
# assert_called_once_with uses == for comparison which fails on arrays
encoder.assert_called_once_with(ArrayMatcher(encoder_inputs), training=True)
expected_loss = [tf.reduce_mean(posteriors.kl_divergence(prior))]
np.testing.assert_equal(lv.losses, expected_loss) # also checks shapes match
@pytest.mark.parametrize("w_dim", [1, 5])
@pytest.mark.parametrize("seed2", [None, 42])
def test_latent_variable_layer_samples(mocker, test_data, w_dim, seed2):
seed = 123
inputs, targets = test_data
num_data, x_dim = inputs.shape
prior_shape = (w_dim,)
posteriors_shape = (num_data, w_dim)
prior = tfp.distributions.MultivariateNormalDiag(
loc=np.random.randn(*prior_shape),
scale_diag=np.random.randn(*prior_shape) ** 2,
)
posteriors = tfp.distributions.MultivariateNormalDiag(
loc=np.random.randn(*posteriors_shape),
scale_diag=np.random.randn(*posteriors_shape) ** 2,
)
encoder = mocker.Mock(return_value=(posteriors.loc, posteriors.scale.diag))
lv = LatentVariableLayer(prior=prior, encoder=encoder)
tf.random.set_seed(seed)
sample_prior = lv(inputs, seed=seed2)
tf.random.set_seed(seed)
prior_expected = np.concatenate([inputs, prior.sample(num_data, seed=seed2)], axis=-1)
np.testing.assert_array_equal(sample_prior, prior_expected)
tf.random.set_seed(seed)
sample_posterior = lv(inputs, observations=[inputs, targets], training=True, seed=seed2)
tf.random.set_seed(seed)
posterior_expected = np.concatenate([inputs, posteriors.sample(seed=seed2)], axis=-1)
np.testing.assert_array_equal(sample_posterior, posterior_expected)
def test_no_tensorflow_metaclass_overwritten():
"""
LayerWithObservations is a subclass of tf.keras.layers.Layer (via TrackableLayer);
this test ensures that TrackableLayer does not have a metaclass, and hence by adding
the ABCMeta to LayerWithObservations we are not accidentally removing some required
TensorFlow magic metaclass.
"""
assert LayerWithObservations.__bases__ == (TrackableLayer,)
assert type(TrackableLayer) is type
assert type(LayerWithObservations) is abc.ABCMeta