File size: 6,417 Bytes
eeef81e
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
#
# Copyright (c) 2021 The GPflux Contributors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

import abc

import numpy as np
import pytest
import tensorflow as tf
import tensorflow_probability as tfp

from gpflow.kullback_leiblers import gauss_kl

from gpflux.encoders import DirectlyParameterizedNormalDiag
from gpflux.layers import LatentVariableLayer, LayerWithObservations, TrackableLayer

tf.keras.backend.set_floatx("float64")

############
# Utilities
############


def _zero_one_normal_prior(w_dim):
    """ N(0, I) prior """
    return tfp.distributions.MultivariateNormalDiag(loc=np.zeros(w_dim), scale_diag=np.ones(w_dim))


def get_distributions_with_w_dim():
    distributions = []
    for d in [1, 5]:

        mean = np.zeros(d)
        scale_tri_l = np.eye(d)
        mvn = tfp.distributions.MultivariateNormalTriL(mean, scale_tri_l)

        std = np.ones(d)
        mvn_diag = tfp.distributions.MultivariateNormalDiag(mean, std)

        distributions.append((mvn, d))
        distributions.append((mvn_diag, d))
    return distributions


############
# Tests
############


@pytest.mark.parametrize("distribution, w_dim", get_distributions_with_w_dim())
def test_local_kls(distribution, w_dim):
    lv = LatentVariableLayer(encoder=None, prior=distribution)

    # test kl is 0 when posteriors == priors
    posterior = distribution
    assert lv._local_kls(posterior) == 0

    # test kl > 0 when posteriors != priors
    batch_size = 10
    params = distribution.parameters
    posterior_params = {
        k: [v + 0.5 for _ in range(batch_size)]
        for k, v in params.items()
        if isinstance(v, np.ndarray)
    }
    posterior = lv.distribution_class(**posterior_params)
    local_kls = lv._local_kls(posterior)
    assert np.all(local_kls > 0)
    assert local_kls.shape == (batch_size,)


@pytest.mark.parametrize("w_dim", [1, 5])
def test_local_kl_gpflow_consistency(w_dim):
    num_data = 400
    means = np.random.randn(num_data, w_dim)
    encoder = DirectlyParameterizedNormalDiag(num_data, w_dim, means)

    lv = LatentVariableLayer(encoder=encoder, prior=_zero_one_normal_prior(w_dim))
    posteriors = lv._inference_posteriors(
        [np.random.randn(num_data, 3), np.random.randn(num_data, 2)]
    )

    q_mu = posteriors.parameters["loc"]
    q_sqrt = posteriors.parameters["scale_diag"]

    gpflow_local_kls = gauss_kl(q_mu, q_sqrt)
    tfp_local_kls = tf.reduce_sum(lv._local_kls(posteriors))

    np.testing.assert_allclose(tfp_local_kls, gpflow_local_kls, rtol=1e-10)


class ArrayMatcher:
    def __init__(self, expected):
        self.expected = expected

    def __eq__(self, actual):
        return np.allclose(actual, self.expected, equal_nan=True)


@pytest.mark.parametrize("w_dim", [1, 5])
def test_latent_variable_layer_losses(mocker, w_dim):
    num_data, x_dim, y_dim = 43, 3, 1

    prior_shape = (w_dim,)
    posteriors_shape = (num_data, w_dim)

    prior = tfp.distributions.MultivariateNormalDiag(
        loc=np.random.randn(*prior_shape),
        scale_diag=np.random.randn(*prior_shape) ** 2,
    )
    posteriors = tfp.distributions.MultivariateNormalDiag(
        loc=np.random.randn(*posteriors_shape),
        scale_diag=np.random.randn(*posteriors_shape) ** 2,
    )

    encoder = mocker.Mock(return_value=(posteriors.loc, posteriors.scale.diag))

    lv = LatentVariableLayer(encoder=encoder, prior=prior)

    inputs = np.full((num_data, x_dim), np.nan)
    targets = np.full((num_data, y_dim), np.nan)
    observations = [inputs, targets]
    encoder_inputs = np.concatenate(observations, axis=-1)

    _ = lv(inputs)
    encoder.assert_not_called()
    assert lv.losses == [0.0]

    _ = lv(inputs, observations=observations, training=True)

    # assert_called_once_with uses == for comparison which fails on arrays
    encoder.assert_called_once_with(ArrayMatcher(encoder_inputs), training=True)

    expected_loss = [tf.reduce_mean(posteriors.kl_divergence(prior))]
    np.testing.assert_equal(lv.losses, expected_loss)  # also checks shapes match


@pytest.mark.parametrize("w_dim", [1, 5])
@pytest.mark.parametrize("seed2", [None, 42])
def test_latent_variable_layer_samples(mocker, test_data, w_dim, seed2):
    seed = 123

    inputs, targets = test_data
    num_data, x_dim = inputs.shape

    prior_shape = (w_dim,)
    posteriors_shape = (num_data, w_dim)

    prior = tfp.distributions.MultivariateNormalDiag(
        loc=np.random.randn(*prior_shape),
        scale_diag=np.random.randn(*prior_shape) ** 2,
    )
    posteriors = tfp.distributions.MultivariateNormalDiag(
        loc=np.random.randn(*posteriors_shape),
        scale_diag=np.random.randn(*posteriors_shape) ** 2,
    )

    encoder = mocker.Mock(return_value=(posteriors.loc, posteriors.scale.diag))

    lv = LatentVariableLayer(prior=prior, encoder=encoder)

    tf.random.set_seed(seed)
    sample_prior = lv(inputs, seed=seed2)
    tf.random.set_seed(seed)
    prior_expected = np.concatenate([inputs, prior.sample(num_data, seed=seed2)], axis=-1)
    np.testing.assert_array_equal(sample_prior, prior_expected)

    tf.random.set_seed(seed)
    sample_posterior = lv(inputs, observations=[inputs, targets], training=True, seed=seed2)
    tf.random.set_seed(seed)
    posterior_expected = np.concatenate([inputs, posteriors.sample(seed=seed2)], axis=-1)
    np.testing.assert_array_equal(sample_posterior, posterior_expected)


def test_no_tensorflow_metaclass_overwritten():
    """
    LayerWithObservations is a subclass of tf.keras.layers.Layer (via TrackableLayer);
    this test ensures that TrackableLayer does not have a metaclass, and hence by adding
    the ABCMeta to LayerWithObservations we are not accidentally removing some required
    TensorFlow magic metaclass.
    """
    assert LayerWithObservations.__bases__ == (TrackableLayer,)
    assert type(TrackableLayer) is type
    assert type(LayerWithObservations) is abc.ABCMeta