| | from functools import partial |
| | import jax |
| | from typing import Any, Callable, Sequence, Optional, NewType |
| | from jax import lax, random, vmap, numpy as jnp |
| | from jax.experimental.ode import odeint |
| | import flax |
| | from flax.training import train_state |
| | from flax import traverse_util |
| | from flax.core import freeze, unfreeze |
| | from flax import linen as nn |
| | from flax import serialization |
| | import optax |
| | import tensorflow_datasets as tfds |
| | import numpy as np |
| | from tqdm import tqdm |
| | import os |
| |
|
| |
|
| | |
| | class ResDownBlock(nn.Module): |
| | """Single ResBlock w/ downsample""" |
| | dim_out: Any = 64 |
| |
|
| | @nn.compact |
| | def __call__(self, inputs): |
| | x = inputs |
| | f_x = nn.relu(nn.GroupNorm(self.dim_out)(x)) |
| | x = nn.Conv(features=self.dim_out, kernel_size=(1, 1), strides=(2, 2))(x) |
| | f_x = nn.Conv(features=self.dim_out, kernel_size=(3, 3), strides=(2, 2))(f_x) |
| | f_x = nn.relu(nn.GroupNorm(self.dim_out)(f_x)) |
| | f_x = nn.Conv(features=self.dim_out, kernel_size=(3, 3))(f_x) |
| | x = f_x + x |
| | return x |
| |
|
| |
|
| | class ConcatConv2D(nn.Module): |
| | """Concat dynamics to hidden layer""" |
| | dim_out: Any = 64 |
| | ksize: Any = 3 |
| |
|
| | @nn.compact |
| | def __call__(self, inputs, t): |
| | x = inputs |
| | tt = jnp.ones_like(x[:, :, :1]) * t |
| | ttx = jnp.concatenate([tt, x], -1) |
| | return nn.Conv(features=self.dim_out, kernel_size=(self.ksize, self.ksize))(ttx) |
| |
|
| |
|
| | |
| | class ODEfunc(nn.Module): |
| | """ODE function which replace ResNet""" |
| | dim_out: Any = 64 |
| | ksize: Any = 3 |
| |
|
| | @nn.compact |
| | def __call__(self, inputs, t): |
| | |
| | |
| | |
| |
|
| | x = inputs |
| | out = nn.GroupNorm(self.dim_out)(x) |
| | out = nn.relu(out) |
| | out = ConcatConv2D(self.dim_out, self.ksize)(out, t) |
| | out = nn.GroupNorm(self.dim_out)(out) |
| | out = nn.relu(out) |
| | out = ConcatConv2D(self.dim_out, self.ksize)(out, t) |
| | out = nn.GroupNorm(self.dim_out)(out) |
| |
|
| | return out |
| |
|
| |
|
| | class NFEcounter(nn.Module): |
| |
|
| | @nn.compact |
| | def __call__(self): |
| | is_initialized = self.has_variable('nfe', 'nfe') |
| | nfe = self.variable('nfe', 'nfe', jnp.array, [0]) |
| | if is_initialized: |
| | nfe.value += 1 |
| |
|
| |
|
| | class ODEBlock(nn.Module): |
| | """ODE block which contains odeint""" |
| | tol: Any = 1. |
| |
|
| | @nn.compact |
| | def __call__(self, inputs, params): |
| | ode_func = ODEfunc() |
| | ode_func_apply = lambda x, t: ode_func.apply(variables={'params': params}, inputs=x, t=t) |
| | init_state, final_state = odeint(ode_func_apply, |
| | inputs, jnp.array([0., 1.]), |
| | rtol=self.tol, atol=self.tol) |
| | return final_state |
| |
|
| |
|
| | class ODEBlockVmap(nn.Module): |
| | """Apply vmap to ODEBlock""" |
| | tol: Any = 1. |
| |
|
| | @nn.compact |
| | def __call__(self, inputs, params): |
| | x = inputs |
| | vmap_odeblock = nn.vmap(ODEBlock, |
| | variable_axes={'params': 0}, |
| | split_rngs={'params': True}, |
| | in_axes=(0, None)) |
| |
|
| | return vmap_odeblock(tol=self.tol, name='odeblock')(x, params) |
| |
|
| |
|
| | class FullODENet(nn.Module): |
| | """Full ODE net which contains two downsampling layers, ODE block and linear classifier.""" |
| | dim_out: Any = 64 |
| | ksize: Any = 3 |
| | tol: Any = 1. |
| |
|
| | @nn.compact |
| | def __call__(self, inputs): |
| | x = inputs |
| | x = nn.Conv(features=self.dim_out, kernel_size=(self.ksize, self.ksize))(x) |
| | x = ResDownBlock()(x) |
| | x = ResDownBlock()(x) |
| |
|
| | ode_func = ODEfunc() |
| | init_fn = lambda rng, x: ode_func.init(random.split(rng)[-1], x, 0.)['params'] |
| | ode_func_params = self.param('ode_func', init_fn, jnp.ones_like(x[0])) |
| | x = ODEBlockVmap(tol=self.tol)(x, ode_func_params) |
| |
|
| | x = nn.GroupNorm(self.dim_out)(x) |
| | x = nn.relu(x) |
| | x = nn.avg_pool(x, (1, 1)) |
| |
|
| | x = x.reshape((x.shape[0], -1)) |
| |
|
| | x = nn.Dense(features=10)(x) |
| | x = nn.log_softmax(x) |
| |
|
| | return x |
| |
|
| |
|
| | |
| | @jax.jit |
| | def cross_entropy_loss(logits, labels): |
| | one_hot_labels = jax.nn.one_hot(labels, num_classes=10) |
| | return -jnp.mean(jnp.sum(one_hot_labels * logits, axis=-1)) |
| |
|
| |
|
| | |
| | @jax.jit |
| | def compute_metrics(logits, labels): |
| | loss = cross_entropy_loss(logits=logits, labels=labels) |
| | accuracy = jnp.mean(jnp.argmax(logits, -1) == labels) |
| | metrics = { |
| | 'loss': loss, |
| | 'accuracy': accuracy, |
| | } |
| | return metrics |
| |
|
| |
|
| | def get_datasets(): |
| | """Load MNIST train and test datasets into memory.""" |
| | ds_builder = tfds.builder('mnist') |
| | ds_builder.download_and_prepare() |
| | train_ds = tfds.as_numpy(ds_builder.as_dataset(split='train', batch_size=-1)) |
| | test_ds = tfds.as_numpy(ds_builder.as_dataset(split='test', batch_size=-1)) |
| | train_ds['image'] = jnp.float32(train_ds['image']) / 255. |
| | test_ds['image'] = jnp.float32(test_ds['image']) / 255. |
| | return train_ds, test_ds |
| |
|
| |
|
| | def create_train_state(rng, learning_rate, tol): |
| | """Creates initial 'TrainState'.""" |
| | odenet = FullODENet(tol=tol) |
| | params = odenet.init(rng, jnp.ones([1, 28, 28, 1]))['params'] |
| | tx = optax.adam(learning_rate) |
| | return train_state.TrainState.create( |
| | apply_fn=odenet.apply, params=params, tx=tx |
| | ) |
| |
|
| |
|
| | |
| | @partial(jax.jit, static_argnums=(2,)) |
| | def train_step(state, batch, tol): |
| | """Train for a single step.""" |
| | def loss_fn(params): |
| | logits = FullODENet(tol=tol).apply({'params': params}, batch['image']) |
| | loss = cross_entropy_loss(logits=logits, labels=batch['label']) |
| | return loss, logits |
| | grad_fn = jax.value_and_grad(loss_fn, has_aux=True) |
| | (_, logits), grads = grad_fn(state.params) |
| | state = state.apply_gradients(grads=grads) |
| | metrics = compute_metrics(logits=logits, labels=batch['label']) |
| | return state, metrics |
| |
|
| |
|
| | |
| | @partial(jax.jit, static_argnums=(2,)) |
| | def eval_step(params, batch, tol): |
| | logits = FullODENet(tol=tol).apply({'params': params}, batch['image']) |
| | return compute_metrics(logits=logits, labels=batch['label']) |
| |
|
| |
|
| | |
| | def train_epoch(state, train_ds, batch_size, epoch, rng, tol): |
| | """Train for a single epoch""" |
| | train_ds_size = len(train_ds['image']) |
| | steps_per_epoch = train_ds_size // batch_size |
| |
|
| | perms = jax.random.permutation(rng, len(train_ds['image'])) |
| | perms = perms[:steps_per_epoch * batch_size] |
| | perms = perms.reshape((steps_per_epoch, batch_size)) |
| | batch_metrics = [] |
| | for perm in tqdm(perms): |
| | batch = {k: v[perm, ...] for k, v in train_ds.items()} |
| | state, metrics = train_step(state, batch, tol) |
| | batch_metrics.append(metrics) |
| |
|
| | |
| | batch_metrics_np = jax.device_get(batch_metrics) |
| | epoch_metrics_np = { |
| | k: np.mean([metrics[k] for metrics in batch_metrics_np]) |
| | for k in batch_metrics_np[0] |
| | } |
| | print('train epoch: %d, loss: %.4f, accuracy: %.2f' % ( |
| | epoch, epoch_metrics_np['loss'], epoch_metrics_np['accuracy'] * 100 |
| | )) |
| |
|
| | return state |
| |
|
| |
|
| | |
| | def eval_model(params, test_ds, tol): |
| | metrics = eval_step(params, test_ds, tol) |
| | metrics = jax.device_get(metrics) |
| | summary = jax.tree_map(lambda x: x.item(), metrics) |
| | return summary['loss'], summary['accuracy'] |
| |
|
| |
|
| | def train_and_evaluate(learning_rate, n_epoch, batch_size, tol): |
| | train_ds, test_ds = get_datasets() |
| | rng = jax.random.PRNGKey(0) |
| | rng, init_rng = jax.random.split(rng) |
| |
|
| | state = create_train_state(init_rng, learning_rate, tol) |
| | del init_rng |
| |
|
| | for epoch in tqdm(range(1, n_epoch + 1)): |
| | rng, input_rng = jax.random.split(rng) |
| | state = train_epoch(state, train_ds, batch_size, epoch, input_rng, tol) |
| | test_loss, test_accuracy = eval_model(state.params, test_ds, tol) |
| | print(' test epoch: %d, loss: %.2f, accuracy: %.2f' % ( |
| | epoch, test_loss, test_accuracy * 100 |
| | )) |
| |
|
| |
|
| | if __name__ == '__main__': |
| | train_and_evaluate(0.0001, 3, 128, 1.) |
| |
|