|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| """Defines NeuMF model for NCF framework.
|
|
|
| Some abbreviations used in the code base:
|
| NeuMF: Neural Matrix Factorization
|
| NCF: Neural Collaborative Filtering
|
| GMF: Generalized Matrix Factorization
|
| MLP: Multi-Layer Perceptron
|
|
|
| GMF applies a linear kernel to model the latent feature interactions, and MLP
|
| uses a nonlinear kernel to learn the interaction function from data. NeuMF model
|
| is a fused model of GMF and MLP to better model the complex user-item
|
| interactions, and unifies the strengths of linearity of MF and non-linearity of
|
| MLP for modeling the user-item latent structures.
|
|
|
| In NeuMF model, it allows GMF and MLP to learn separate embeddings, and combine
|
| the two models by concatenating their last hidden layer.
|
| """
|
| from __future__ import absolute_import
|
| from __future__ import division
|
| from __future__ import print_function
|
|
|
| import sys
|
|
|
| from six.moves import xrange
|
| import tensorflow as tf, tf_keras
|
| from tensorflow import estimator as tf_estimator
|
| from typing import Any, Dict, Text
|
|
|
| from official.recommendation import constants as rconst
|
| from official.recommendation import movielens
|
| from official.recommendation import ncf_common
|
| from official.recommendation import stat_utils
|
|
|
|
|
| def sparse_to_dense_grads(grads_and_vars):
|
| """Convert sparse gradients to dense gradients.
|
|
|
| All sparse gradients, which are represented as instances of tf.IndexedSlices,
|
| are converted to dense Tensors. Dense gradients, which are represents as
|
| Tensors, are unchanged.
|
|
|
| The purpose of this conversion is that for small embeddings, which are used by
|
| this model, applying dense gradients with the AdamOptimizer is faster than
|
| applying sparse gradients.
|
|
|
| Args
|
| grads_and_vars: A list of (gradient, variable) tuples. Each gradient can
|
| be a Tensor or an IndexedSlices. Tensors are unchanged, and IndexedSlices
|
| are converted to dense Tensors.
|
| Returns:
|
| The same list of (gradient, variable) as `grads_and_vars`, except each
|
| IndexedSlices gradient is converted to a Tensor.
|
| """
|
|
|
|
|
|
|
| return [(tf.convert_to_tensor(g), v) for g, v in grads_and_vars]
|
|
|
|
|
| def neumf_model_fn(features, labels, mode, params):
|
| """Model Function for NeuMF estimator."""
|
| if params.get("use_seed"):
|
| tf.set_random_seed(stat_utils.random_int32())
|
|
|
| users = features[movielens.USER_COLUMN]
|
| items = features[movielens.ITEM_COLUMN]
|
|
|
| user_input = tf_keras.layers.Input(tensor=users)
|
| item_input = tf_keras.layers.Input(tensor=items)
|
| logits = construct_model(user_input, item_input, params).output
|
|
|
|
|
| softmax_logits = ncf_common.convert_to_softmax_logits(logits)
|
|
|
| if mode == tf_estimator.ModeKeys.EVAL:
|
| duplicate_mask = tf.cast(features[rconst.DUPLICATE_MASK], tf.float32)
|
| return _get_estimator_spec_with_metrics(
|
| logits,
|
| softmax_logits,
|
| duplicate_mask,
|
| params["num_neg"],
|
| params["match_mlperf"],
|
| use_tpu_spec=params["use_tpu"])
|
|
|
| elif mode == tf_estimator.ModeKeys.TRAIN:
|
| labels = tf.cast(labels, tf.int32)
|
| valid_pt_mask = features[rconst.VALID_POINT_MASK]
|
|
|
| optimizer = tf.compat.v1.train.AdamOptimizer(
|
| learning_rate=params["learning_rate"],
|
| beta1=params["beta1"],
|
| beta2=params["beta2"],
|
| epsilon=params["epsilon"])
|
| if params["use_tpu"]:
|
| optimizer = tf.compat.v1.tpu.CrossShardOptimizer(optimizer)
|
|
|
| loss = tf.compat.v1.losses.sparse_softmax_cross_entropy(
|
| labels=labels,
|
| logits=softmax_logits,
|
| weights=tf.cast(valid_pt_mask, tf.float32))
|
|
|
| tf.identity(loss, name="cross_entropy")
|
|
|
| global_step = tf.compat.v1.train.get_global_step()
|
| tvars = tf.compat.v1.trainable_variables()
|
| gradients = optimizer.compute_gradients(
|
| loss, tvars, colocate_gradients_with_ops=True)
|
| gradients = sparse_to_dense_grads(gradients)
|
| minimize_op = optimizer.apply_gradients(
|
| gradients, global_step=global_step, name="train")
|
| update_ops = tf.compat.v1.get_collection(tf.compat.v1.GraphKeys.UPDATE_OPS)
|
| train_op = tf.group(minimize_op, update_ops)
|
|
|
| return tf_estimator.EstimatorSpec(mode=mode, loss=loss, train_op=train_op)
|
|
|
| else:
|
| raise NotImplementedError
|
|
|
|
|
| def _strip_first_and_last_dimension(x, batch_size):
|
| return tf.reshape(x[0, :], (batch_size,))
|
|
|
|
|
| def construct_model(user_input: tf.Tensor, item_input: tf.Tensor,
|
| params: Dict[Text, Any]) -> tf_keras.Model:
|
| """Initialize NeuMF model.
|
|
|
| Args:
|
| user_input: keras input layer for users
|
| item_input: keras input layer for items
|
| params: Dict of hyperparameters.
|
|
|
| Raises:
|
| ValueError: if the first model layer is not even.
|
| Returns:
|
| model: a keras Model for computing the logits
|
| """
|
| num_users = params["num_users"]
|
| num_items = params["num_items"]
|
|
|
| model_layers = params["model_layers"]
|
|
|
| mf_regularization = params["mf_regularization"]
|
| mlp_reg_layers = params["mlp_reg_layers"]
|
|
|
| mf_dim = params["mf_dim"]
|
|
|
| if model_layers[0] % 2 != 0:
|
| raise ValueError("The first layer size should be multiple of 2!")
|
|
|
|
|
| embedding_initializer = "glorot_uniform"
|
|
|
| def mf_slice_fn(x):
|
| x = tf.squeeze(x, [1])
|
| return x[:, :mf_dim]
|
|
|
| def mlp_slice_fn(x):
|
| x = tf.squeeze(x, [1])
|
| return x[:, mf_dim:]
|
|
|
|
|
|
|
| embedding_user = tf_keras.layers.Embedding(
|
| num_users,
|
| mf_dim + model_layers[0] // 2,
|
| embeddings_initializer=embedding_initializer,
|
| embeddings_regularizer=tf_keras.regularizers.l2(mf_regularization),
|
| input_length=1,
|
| name="embedding_user")(
|
| user_input)
|
|
|
| embedding_item = tf_keras.layers.Embedding(
|
| num_items,
|
| mf_dim + model_layers[0] // 2,
|
| embeddings_initializer=embedding_initializer,
|
| embeddings_regularizer=tf_keras.regularizers.l2(mf_regularization),
|
| input_length=1,
|
| name="embedding_item")(
|
| item_input)
|
|
|
|
|
| mf_user_latent = tf_keras.layers.Lambda(
|
| mf_slice_fn, name="embedding_user_mf")(
|
| embedding_user)
|
| mf_item_latent = tf_keras.layers.Lambda(
|
| mf_slice_fn, name="embedding_item_mf")(
|
| embedding_item)
|
|
|
|
|
| mlp_user_latent = tf_keras.layers.Lambda(
|
| mlp_slice_fn, name="embedding_user_mlp")(
|
| embedding_user)
|
| mlp_item_latent = tf_keras.layers.Lambda(
|
| mlp_slice_fn, name="embedding_item_mlp")(
|
| embedding_item)
|
|
|
|
|
| mf_vector = tf_keras.layers.multiply([mf_user_latent, mf_item_latent])
|
|
|
|
|
| mlp_vector = tf_keras.layers.concatenate([mlp_user_latent, mlp_item_latent])
|
|
|
| num_layer = len(model_layers)
|
| for layer in xrange(1, num_layer):
|
| model_layer = tf_keras.layers.Dense(
|
| model_layers[layer],
|
| kernel_regularizer=tf_keras.regularizers.l2(mlp_reg_layers[layer]),
|
| activation="relu")
|
| mlp_vector = model_layer(mlp_vector)
|
|
|
|
|
| predict_vector = tf_keras.layers.concatenate([mf_vector, mlp_vector])
|
|
|
|
|
| logits = tf_keras.layers.Dense(
|
| 1,
|
| activation=None,
|
| kernel_initializer="lecun_uniform",
|
| name=movielens.RATING_COLUMN)(
|
| predict_vector)
|
|
|
|
|
| model = tf_keras.models.Model([user_input, item_input], logits)
|
| model.summary()
|
| sys.stdout.flush()
|
|
|
| return model
|
|
|
|
|
| def _get_estimator_spec_with_metrics(logits: tf.Tensor,
|
| softmax_logits: tf.Tensor,
|
| duplicate_mask: tf.Tensor,
|
| num_training_neg: int,
|
| match_mlperf: bool = False,
|
| use_tpu_spec: bool = False):
|
| """Returns a EstimatorSpec that includes the metrics."""
|
| cross_entropy, \
|
| metric_fn, \
|
| in_top_k, \
|
| ndcg, \
|
| metric_weights = compute_eval_loss_and_metrics_helper(
|
| logits,
|
| softmax_logits,
|
| duplicate_mask,
|
| num_training_neg,
|
| match_mlperf)
|
|
|
| if use_tpu_spec:
|
| return tf_estimator.tpu.TPUEstimatorSpec(
|
| mode=tf_estimator.ModeKeys.EVAL,
|
| loss=cross_entropy,
|
| eval_metrics=(metric_fn, [in_top_k, ndcg, metric_weights]))
|
|
|
| return tf_estimator.EstimatorSpec(
|
| mode=tf_estimator.ModeKeys.EVAL,
|
| loss=cross_entropy,
|
| eval_metric_ops=metric_fn(in_top_k, ndcg, metric_weights))
|
|
|
|
|
| def compute_eval_loss_and_metrics_helper(logits: tf.Tensor,
|
| softmax_logits: tf.Tensor,
|
| duplicate_mask: tf.Tensor,
|
| num_training_neg: int,
|
| match_mlperf: bool = False):
|
| """Model evaluation with HR and NDCG metrics.
|
|
|
| The evaluation protocol is to rank the test interacted item (truth items)
|
| among the randomly chosen 999 items that are not interacted by the user.
|
| The performance of the ranked list is judged by Hit Ratio (HR) and Normalized
|
| Discounted Cumulative Gain (NDCG).
|
|
|
| For evaluation, the ranked list is truncated at 10 for both metrics. As such,
|
| the HR intuitively measures whether the test item is present on the top-10
|
| list, and the NDCG accounts for the position of the hit by assigning higher
|
| scores to hits at top ranks. Both metrics are calculated for each test user,
|
| and the average scores are reported.
|
|
|
| If `match_mlperf` is True, then the HR and NDCG computations are done in a
|
| slightly unusual way to match the MLPerf reference implementation.
|
| Specifically, if the evaluation negatives contain duplicate items, it will be
|
| treated as if the item only appeared once. Effectively, for duplicate items in
|
| a row, the predicted score for all but one of the items will be set to
|
| -infinity
|
|
|
| For example, suppose we have that following inputs:
|
| logits_by_user: [[ 2, 3, 3],
|
| [ 5, 4, 4]]
|
|
|
| items_by_user: [[10, 20, 20],
|
| [30, 40, 40]]
|
|
|
| # Note: items_by_user is not explicitly present. Instead the relevant \
|
| information is contained within `duplicate_mask`
|
|
|
| top_k: 2
|
|
|
| Then with match_mlperf=True, the HR would be 2/2 = 1.0. With
|
| match_mlperf=False, the HR would be 1/2 = 0.5. This is because each user has
|
| predicted scores for only 2 unique items: 10 and 20 for the first user, and 30
|
| and 40 for the second. Therefore, with match_mlperf=True, it's guaranteed the
|
| first item's score is in the top 2. With match_mlperf=False, this function
|
| would compute the first user's first item is not in the top 2, because item 20
|
| has a higher score, and item 20 occurs twice.
|
|
|
| Args:
|
| logits: A tensor containing the predicted logits for each user. The shape of
|
| logits is (num_users_per_batch * (1 + NUM_EVAL_NEGATIVES),) Logits for a
|
| user are grouped, and the last element of the group is the true element.
|
| softmax_logits: The same tensor, but with zeros left-appended.
|
| duplicate_mask: A vector with the same shape as logits, with a value of 1 if
|
| the item corresponding to the logit at that position has already appeared
|
| for that user.
|
| num_training_neg: The number of negatives per positive during training.
|
| match_mlperf: Use the MLPerf reference convention for computing rank.
|
|
|
| Returns:
|
| cross_entropy: the loss
|
| metric_fn: the metrics function
|
| in_top_k: hit rate metric
|
| ndcg: ndcg metric
|
| metric_weights: metric weights
|
| """
|
| in_top_k, ndcg, metric_weights, logits_by_user = compute_top_k_and_ndcg(
|
| logits, duplicate_mask, match_mlperf)
|
|
|
|
|
|
|
| eval_labels = tf.reshape(
|
| shape=(-1,),
|
| tensor=tf.one_hot(
|
| tf.zeros(shape=(logits_by_user.shape[0],), dtype=tf.int32) +
|
| rconst.NUM_EVAL_NEGATIVES,
|
| logits_by_user.shape[1],
|
| dtype=tf.int32))
|
|
|
| eval_labels_float = tf.cast(eval_labels, tf.float32)
|
|
|
|
|
|
|
|
|
|
|
| negative_scale_factor = num_training_neg / rconst.NUM_EVAL_NEGATIVES
|
| example_weights = ((eval_labels_float +
|
| (1 - eval_labels_float) * negative_scale_factor) *
|
| (1 + rconst.NUM_EVAL_NEGATIVES) / (1 + num_training_neg))
|
|
|
|
|
| expanded_metric_weights = tf.reshape(
|
| tf.tile(metric_weights[:, tf.newaxis],
|
| (1, rconst.NUM_EVAL_NEGATIVES + 1)), (-1,))
|
|
|
|
|
| example_weights *= tf.cast(expanded_metric_weights, tf.float32)
|
|
|
| cross_entropy = tf.compat.v1.losses.sparse_softmax_cross_entropy(
|
| logits=softmax_logits, labels=eval_labels, weights=example_weights)
|
|
|
| def metric_fn(top_k_tensor, ndcg_tensor, weight_tensor):
|
| return {
|
| rconst.HR_KEY:
|
| tf.compat.v1.metrics.mean(
|
| top_k_tensor, weights=weight_tensor,
|
| name=rconst.HR_METRIC_NAME),
|
| rconst.NDCG_KEY:
|
| tf.compat.v1.metrics.mean(
|
| ndcg_tensor,
|
| weights=weight_tensor,
|
| name=rconst.NDCG_METRIC_NAME)
|
| }
|
|
|
| return cross_entropy, metric_fn, in_top_k, ndcg, metric_weights
|
|
|
|
|
| def compute_top_k_and_ndcg(logits: tf.Tensor,
|
| duplicate_mask: tf.Tensor,
|
| match_mlperf: bool = False):
|
| """Compute inputs of metric calculation.
|
|
|
| Args:
|
| logits: A tensor containing the predicted logits for each user. The shape of
|
| logits is (num_users_per_batch * (1 + NUM_EVAL_NEGATIVES),) Logits for a
|
| user are grouped, and the first element of the group is the true element.
|
| duplicate_mask: A vector with the same shape as logits, with a value of 1 if
|
| the item corresponding to the logit at that position has already appeared
|
| for that user.
|
| match_mlperf: Use the MLPerf reference convention for computing rank.
|
|
|
| Returns:
|
| is_top_k, ndcg and weights, all of which has size (num_users_in_batch,), and
|
| logits_by_user which has size
|
| (num_users_in_batch, (rconst.NUM_EVAL_NEGATIVES + 1)).
|
| """
|
| logits_by_user = tf.reshape(logits, (-1, rconst.NUM_EVAL_NEGATIVES + 1))
|
| duplicate_mask_by_user = tf.cast(
|
| tf.reshape(duplicate_mask, (-1, rconst.NUM_EVAL_NEGATIVES + 1)),
|
| logits_by_user.dtype)
|
|
|
| if match_mlperf:
|
|
|
|
|
| logits_by_user *= (1 - duplicate_mask_by_user)
|
| logits_by_user += duplicate_mask_by_user * logits_by_user.dtype.min
|
|
|
|
|
|
|
| sort_indices = tf.argsort(logits_by_user, axis=1, direction="DESCENDING")
|
|
|
|
|
|
|
|
|
|
|
|
|
| one_hot_position = tf.cast(
|
| tf.equal(sort_indices, rconst.NUM_EVAL_NEGATIVES), tf.int32)
|
| sparse_positions = tf.multiply(
|
| one_hot_position,
|
| tf.range(logits_by_user.shape[1])[tf.newaxis, :])
|
| position_vector = tf.reduce_sum(sparse_positions, axis=1)
|
|
|
| in_top_k = tf.cast(tf.less(position_vector, rconst.TOP_K), tf.float32)
|
| ndcg = tf.math.log(2.) / tf.math.log(tf.cast(position_vector, tf.float32) + 2)
|
| ndcg *= in_top_k
|
|
|
|
|
| metric_weights = tf.not_equal(
|
| tf.reduce_sum(duplicate_mask_by_user, axis=1), rconst.NUM_EVAL_NEGATIVES)
|
|
|
| return in_top_k, ndcg, metric_weights, logits_by_user
|
|
|