| ''' |
| Reference: https://github.com/openai/imitation |
| I follow the architecture from the official repository |
| ''' |
| import tensorflow as tf |
| import numpy as np |
|
|
| from baselines.common.mpi_running_mean_std import RunningMeanStd |
| from baselines.common import tf_util as U |
|
|
| def logsigmoid(a): |
| '''Equivalent to tf.log(tf.sigmoid(a))''' |
| return -tf.nn.softplus(-a) |
|
|
| """ Reference: https://github.com/openai/imitation/blob/99fbccf3e060b6e6c739bdf209758620fcdefd3c/policyopt/thutil.py#L48-L51""" |
| def logit_bernoulli_entropy(logits): |
| ent = (1.-tf.nn.sigmoid(logits))*logits - logsigmoid(logits) |
| return ent |
|
|
| class TransitionClassifier(object): |
| def __init__(self, env, hidden_size, entcoeff=0.001, lr_rate=1e-3, scope="adversary"): |
| self.scope = scope |
| self.observation_shape = env.observation_space.shape |
| self.actions_shape = env.action_space.shape |
| self.input_shape = tuple([o+a for o, a in zip(self.observation_shape, self.actions_shape)]) |
| self.num_actions = env.action_space.shape[0] |
| self.hidden_size = hidden_size |
| self.build_ph() |
| |
| generator_logits = self.build_graph(self.generator_obs_ph, self.generator_acs_ph, reuse=False) |
| expert_logits = self.build_graph(self.expert_obs_ph, self.expert_acs_ph, reuse=True) |
| |
| generator_acc = tf.reduce_mean(input_tensor=tf.cast(tf.nn.sigmoid(generator_logits) < 0.5, dtype=tf.float32)) |
| expert_acc = tf.reduce_mean(input_tensor=tf.cast(tf.nn.sigmoid(expert_logits) > 0.5, dtype=tf.float32)) |
| |
| |
| |
| generator_loss = tf.nn.sigmoid_cross_entropy_with_logits(logits=generator_logits, labels=tf.zeros_like(generator_logits)) |
| generator_loss = tf.reduce_mean(input_tensor=generator_loss) |
| expert_loss = tf.nn.sigmoid_cross_entropy_with_logits(logits=expert_logits, labels=tf.ones_like(expert_logits)) |
| expert_loss = tf.reduce_mean(input_tensor=expert_loss) |
| |
| logits = tf.concat([generator_logits, expert_logits], 0) |
| entropy = tf.reduce_mean(input_tensor=logit_bernoulli_entropy(logits)) |
| entropy_loss = -entcoeff*entropy |
| |
| self.losses = [generator_loss, expert_loss, entropy, entropy_loss, generator_acc, expert_acc] |
| self.loss_name = ["generator_loss", "expert_loss", "entropy", "entropy_loss", "generator_acc", "expert_acc"] |
| self.total_loss = generator_loss + expert_loss + entropy_loss |
| |
| self.reward_op = -tf.math.log(1-tf.nn.sigmoid(generator_logits)+1e-8) |
| var_list = self.get_trainable_variables() |
| self.lossandgrad = U.function([self.generator_obs_ph, self.generator_acs_ph, self.expert_obs_ph, self.expert_acs_ph], |
| self.losses + [U.flatgrad(self.total_loss, var_list)]) |
|
|
| def build_ph(self): |
| self.generator_obs_ph = tf.compat.v1.placeholder(tf.float32, (None, ) + self.observation_shape, name="observations_ph") |
| self.generator_acs_ph = tf.compat.v1.placeholder(tf.float32, (None, ) + self.actions_shape, name="actions_ph") |
| self.expert_obs_ph = tf.compat.v1.placeholder(tf.float32, (None, ) + self.observation_shape, name="expert_observations_ph") |
| self.expert_acs_ph = tf.compat.v1.placeholder(tf.float32, (None, ) + self.actions_shape, name="expert_actions_ph") |
|
|
| def build_graph(self, obs_ph, acs_ph, reuse=False): |
| with tf.compat.v1.variable_scope(self.scope): |
| if reuse: |
| tf.compat.v1.get_variable_scope().reuse_variables() |
|
|
| with tf.compat.v1.variable_scope("obfilter"): |
| self.obs_rms = RunningMeanStd(shape=self.observation_shape) |
| obs = (obs_ph - self.obs_rms.mean) / self.obs_rms.std |
| _input = tf.concat([obs, acs_ph], axis=1) |
| p_h1 = tf.contrib.layers.fully_connected(_input, self.hidden_size, activation_fn=tf.nn.tanh) |
| p_h2 = tf.contrib.layers.fully_connected(p_h1, self.hidden_size, activation_fn=tf.nn.tanh) |
| logits = tf.contrib.layers.fully_connected(p_h2, 1, activation_fn=tf.identity) |
| return logits |
|
|
| def get_trainable_variables(self): |
| return tf.compat.v1.get_collection(tf.compat.v1.GraphKeys.TRAINABLE_VARIABLES, self.scope) |
|
|
| def get_reward(self, obs, acs): |
| sess = tf.compat.v1.get_default_session() |
| if len(obs.shape) == 1: |
| obs = np.expand_dims(obs, 0) |
| if len(acs.shape) == 1: |
| acs = np.expand_dims(acs, 0) |
| feed_dict = {self.generator_obs_ph: obs, self.generator_acs_ph: acs} |
| reward = sess.run(self.reward_op, feed_dict) |
| return reward |
|
|