| import tensorflow as tf |
| from tensorflow.python.ops import rnn_cell |
| from tensorflow.python.ops import seq2seq |
| import random |
| import numpy as np |
|
|
| class Model(): |
| def __init__(self, args, infer=False): |
| self.args = args |
| if infer: |
| args.batch_size = 1 |
| args.seq_length = 1 |
|
|
| if args.model == 'rnn': |
| cell_fn = rnn_cell.BasicRNNCell |
| elif args.model == 'gru': |
| cell_fn = rnn_cell.GRUCell |
| elif args.model == 'lstm': |
| cell_fn = rnn_cell.BasicLSTMCell |
| else: |
| raise Exception("model type not supported: {}".format(args.model)) |
|
|
| cell = cell_fn(args.rnn_size) |
|
|
| self.cell = cell = rnn_cell.MultiRNNCell([cell] * args.num_layers) |
|
|
| self.input_data = tf.placeholder(tf.int32, [args.batch_size, args.seq_length]) |
| self.targets = tf.placeholder(tf.int32, [args.batch_size, args.seq_length]) |
| self.initial_state = cell.zero_state(args.batch_size, tf.float32) |
|
|
| with tf.variable_scope('rnnlm'): |
| softmax_w = tf.get_variable("softmax_w", [args.rnn_size, args.vocab_size]) |
| softmax_b = tf.get_variable("softmax_b", [args.vocab_size]) |
| with tf.device("/cpu:0"): |
| embedding = tf.get_variable("embedding", [args.vocab_size, args.rnn_size]) |
| inputs = tf.split(1, args.seq_length, tf.nn.embedding_lookup(embedding, self.input_data)) |
| inputs = [tf.squeeze(input_, [1]) for input_ in inputs] |
|
|
| def loop(prev, _): |
| prev = tf.matmul(prev, softmax_w) + softmax_b |
| prev_symbol = tf.stop_gradient(tf.argmax(prev, 1)) |
| return tf.nn.embedding_lookup(embedding, prev_symbol) |
|
|
| outputs, last_state = seq2seq.rnn_decoder(inputs, self.initial_state, cell, loop_function=loop if infer else None, scope='rnnlm') |
| output = tf.reshape(tf.concat(1, outputs), [-1, args.rnn_size]) |
| self.logits = tf.matmul(output, softmax_w) + softmax_b |
| self.probs = tf.nn.softmax(self.logits) |
| loss = seq2seq.sequence_loss_by_example([self.logits], |
| [tf.reshape(self.targets, [-1])], |
| [tf.ones([args.batch_size * args.seq_length])], |
| args.vocab_size) |
| self.cost = tf.reduce_sum(loss) / args.batch_size / args.seq_length |
| self.final_state = last_state |
| self.lr = tf.Variable(0.0, trainable=False) |
| tvars = tf.trainable_variables() |
| grads, _ = tf.clip_by_global_norm(tf.gradients(self.cost, tvars), |
| args.grad_clip) |
| optimizer = tf.train.AdamOptimizer(self.lr) |
| self.train_op = optimizer.apply_gradients(zip(grads, tvars)) |
|
|
| def sample(self, sess, words, vocab, num=200, prime='first all', sampling_type=1): |
| state = sess.run(self.cell.zero_state(1, tf.float32)) |
| if not len(prime) or prime == " ": |
| prime = random.choice(list(vocab.keys())) |
| print (prime) |
| for word in prime.split()[:-1]: |
| print (word) |
| x = np.zeros((1, 1)) |
| x[0, 0] = vocab.get(word,0) |
| feed = {self.input_data: x, self.initial_state:state} |
| [state] = sess.run([self.final_state], feed) |
| |
| def weighted_pick(weights): |
| t = np.cumsum(weights) |
| s = np.sum(weights) |
| return(int(np.searchsorted(t, np.random.rand(1)*s))) |
|
|
| ret = prime |
| word = prime.split()[-1] |
| for n in range(num): |
| x = np.zeros((1, 1)) |
| x[0, 0] = vocab.get(word,0) |
| feed = {self.input_data: x, self.initial_state:state} |
| [probs, state] = sess.run([self.probs, self.final_state], feed) |
| p = probs[0] |
|
|
| if sampling_type == 0: |
| sample = np.argmax(p) |
| elif sampling_type == 2: |
| if word == '\n': |
| sample = weighted_pick(p) |
| else: |
| sample = np.argmax(p) |
| else: |
| sample = weighted_pick(p) |
|
|
| pred = words[sample] |
| ret += ' ' + pred |
| word = pred |
| return ret |
|
|