Spaces:
Runtime error
Runtime error
| import torch | |
| import torch.nn as nn | |
| import torch.nn.utils.rnn as R | |
| import torch.nn.functional as F | |
| from torch.autograd import Variable | |
| import numpy as np | |
| class PointerNetworks(nn.Module): | |
| def __init__(self,voca_size, voc_embeddings,word_dim, hidden_dim,is_bi_encoder_rnn,rnn_type,rnn_layers, | |
| dropout_prob,use_cuda,finedtuning,isbanor,batchsize): | |
| super(PointerNetworks,self).__init__() | |
| self.word_dim = word_dim | |
| self.voca_size = voca_size | |
| self.hidden_dim = hidden_dim | |
| self.dropout_prob = dropout_prob | |
| self.is_bi_encoder_rnn = is_bi_encoder_rnn | |
| self.num_rnn_layers = rnn_layers | |
| self.rnn_type = rnn_type | |
| self.voc_embeddings = voc_embeddings | |
| self.finedtuning = finedtuning | |
| self.batchsize = batchsize | |
| self.nnDropout = nn.Dropout(dropout_prob) | |
| self.isbanor = isbanor | |
| if rnn_type in ['LSTM', 'GRU']: | |
| self.decoder_rnn = getattr(nn, rnn_type)(input_size=word_dim, | |
| hidden_size=2 * hidden_dim if is_bi_encoder_rnn else hidden_dim, | |
| num_layers=rnn_layers, | |
| dropout=dropout_prob, | |
| batch_first=True) | |
| self.encoder_rnn = getattr(nn, rnn_type)(input_size=word_dim, | |
| hidden_size=hidden_dim, | |
| num_layers=rnn_layers, | |
| bidirectional=is_bi_encoder_rnn, | |
| dropout=dropout_prob, | |
| batch_first=True) | |
| else: | |
| print('rnn_type should be LSTM,GRU') | |
| self.use_cuda = False | |
| self.nnSELU = nn.SELU() | |
| self.nnEm = nn.Embedding(self.voca_size,self.word_dim,padding_idx=2000001) | |
| #self.nnEm = nn.Embedding.from_pretrained(self.voc_embeddings,freeze=self.finedtuning,padding_idx=-1) | |
| self.initEmbeddings(self.voc_embeddings) | |
| #if self.use_cuda: | |
| # self.nnEm = self.nnEm.cuda() | |
| if self.is_bi_encoder_rnn: | |
| self.num_encoder_bi = 2 | |
| else: | |
| self.num_encoder_bi = 1 | |
| self.nnW1 = nn.Linear(self.num_encoder_bi * hidden_dim, self.num_encoder_bi * hidden_dim, bias=False) | |
| self.nnW2 = nn.Linear(self.num_encoder_bi * hidden_dim, self.num_encoder_bi * hidden_dim, bias=False) | |
| self.nnV = nn.Linear(self.num_encoder_bi * hidden_dim, 1, bias=False) | |
| def initEmbeddings(self,weights): | |
| self.nnEm.weight.data.copy_(torch.from_numpy(weights)) | |
| self.nnEm.weight.requires_grad = self.finedtuning | |
| def initHidden(self,hsize,batchsize): | |
| #hsize=self.hidden_dim | |
| #batchsize=self.batchsize | |
| if self.rnn_type == 'LSTM': | |
| h_0 = Variable(torch.zeros(self.num_encoder_bi*self.num_rnn_layers, batchsize, hsize)) | |
| c_0 = Variable(torch.zeros(self.num_encoder_bi*self.num_rnn_layers, batchsize, hsize)) | |
| #if self.use_cuda: | |
| # h_0 = h_0.cuda() | |
| # c_0 = c_0.cuda() | |
| return (h_0, c_0) | |
| else: | |
| h_0 = Variable(torch.zeros(self.num_encoder_bi*self.num_rnn_layers, batchsize, hsize)) | |
| #if self.use_cuda: | |
| # h_0 = h_0.cuda() | |
| return h_0 | |
| def _run_rnn_packed(self, cell, x, x_lens, h=None): | |
| #print(x_lens) | |
| x_packed = R.pack_padded_sequence(x, x_lens.data.tolist(), | |
| batch_first=True, enforce_sorted=False) | |
| if h is not None: | |
| output, h = cell(x_packed, h) | |
| else: | |
| output, h = cell(x_packed) | |
| output, _ = R.pad_packed_sequence(output, batch_first=True) | |
| return output, h | |
| def pointerEncoder(self,Xin,lens): | |
| self.bn_inputdata = nn.BatchNorm1d(self.word_dim, affine=False, track_running_stats=False) | |
| batch_size,maxL = Xin.size() | |
| X = self.nnEm(Xin) # N L C | |
| if self.isbanor and maxL>1: | |
| X= X.permute(0,2,1) # N C L | |
| X = self.bn_inputdata(X) | |
| X = X.permute(0, 2, 1) # N L C | |
| X = self.nnDropout(X) | |
| encoder_lstm_co_h_o = self.initHidden(self.hidden_dim, batch_size) | |
| o, h = self._run_rnn_packed(self.encoder_rnn, X, lens, encoder_lstm_co_h_o) # batch_first=True | |
| o = o.contiguous() | |
| o = self.nnDropout(o) | |
| return o,h | |
| def pointerLayer(self,en,di): | |
| """ | |
| :param en: [L,H] | |
| :param di: [H,] | |
| :return: | |
| """ | |
| WE = self.nnW1(en) | |
| exdi = di.expand_as(en) | |
| WD = self.nnW2(exdi) | |
| nnV = self.nnV(self.nnSELU(WE+WD)) | |
| nnV = nnV.permute(1,0) | |
| nnV = self.nnSELU(nnV) | |
| #TODO: for log loss | |
| att_weights = F.softmax(nnV) | |
| logits = F.log_softmax(nnV) | |
| return logits,att_weights | |
| def training_decoder(self,hn,hend,X,Xindex,Yindex,lens): | |
| """ | |
| """ | |
| loss_function = nn.NLLLoss() | |
| batch_loss =0 | |
| LoopN =0 | |
| batch_size = len(lens) | |
| for i in range(len(lens)): #Loop batch size | |
| curX_index = Xindex[i] | |
| #print(curX_index) | |
| #print() | |
| curY_index = Yindex[i] | |
| curL = lens[i] | |
| curX = X[i] | |
| #print(curX) | |
| x_index_var = Variable(torch.from_numpy(curX_index.astype(np.int64))) | |
| #if self.use_cuda: | |
| # x_index_var = x_index_var.cuda() | |
| cur_lookup = curX[x_index_var] | |
| #print(cur_lookup) | |
| curX_vectors = self.nnEm(cur_lookup) # output: [seq,features] | |
| curX_vectors = curX_vectors.unsqueeze(0) # [batch, seq, features] | |
| if self.rnn_type =='LSTM':# need h_end,c_end | |
| h_end = hend[0].permute(1, 0, 2).contiguous().view(batch_size, self.num_rnn_layers,-1) | |
| c_end = hend[1].permute(1, 0, 2).contiguous().view(batch_size, self.num_rnn_layers,-1) | |
| curh0 = h_end[i].unsqueeze(0).permute(1, 0, 2) | |
| curc0 = c_end[i].unsqueeze(0).permute(1, 0, 2) | |
| h_pass = (curh0,curc0) | |
| else: | |
| h_end = hend.permute(1, 0, 2).contiguous().view(batch_size, self.num_rnn_layers,-1) | |
| curh0 = h_end[i].unsqueeze(0).permute(1, 0, 2) | |
| h_pass = curh0 | |
| decoder_out,_ = self.decoder_rnn(curX_vectors,h_pass) | |
| decoder_out = decoder_out.squeeze(0) #[seq,features] | |
| curencoder_hn = hn[i,0:curL,:] # hn[batch,seq,H] -->[seq,H] i is loop batch size | |
| for j in range(len(decoder_out)): #Loop di | |
| #print(len(decoder_out),curY_index) | |
| cur_dj = decoder_out[j] | |
| cur_groundy = curY_index[j] | |
| cur_start_index = curX_index[j] | |
| predict_range = list(range(cur_start_index,curL)) | |
| # TODO: make it point backward, only consider predict_range in current time step | |
| # align groundtruth | |
| cur_groundy_var = Variable(torch.LongTensor([int(cur_groundy) - int(cur_start_index)])) | |
| #if self.use_cuda: | |
| # cur_groundy_var = cur_groundy_var.cuda() | |
| curencoder_hn_back = curencoder_hn[predict_range,:] | |
| cur_logists, cur_weights = self.pointerLayer(curencoder_hn_back,cur_dj) | |
| batch_loss = batch_loss + loss_function(cur_logists,cur_groundy_var) | |
| LoopN = LoopN +1 | |
| batch_loss = batch_loss/LoopN | |
| return batch_loss | |
| def neg_log_likelihood(self,Xin,index_decoder_x, index_decoder_y,lens): | |
| ''' | |
| :param Xin: stack_x, [allseq,wordDim] | |
| :param Yin: | |
| :param lens: | |
| :return: | |
| ''' | |
| encoder_hn, encoder_h_end = self.pointerEncoder(Xin,lens) | |
| loss = self.training_decoder(encoder_hn, encoder_h_end,Xin,index_decoder_x, index_decoder_y,lens) | |
| return loss | |
| def test_decoder(self,hn,hend,X,Yindex,lens): | |
| loss_function = nn.NLLLoss() | |
| batch_loss = 0 | |
| LoopN = 0 | |
| batch_boundary =[] | |
| batch_boundary_start =[] | |
| batch_align_matrix =[] | |
| batch_size = len(lens) | |
| for i in range(len(lens)): # Loop batch size | |
| curL = lens[i] | |
| curY_index = Yindex[i] | |
| curX = X[i] | |
| cur_end_boundary =curY_index[-1] | |
| cur_boundary = [] | |
| cur_b_start = [] | |
| cur_align_matrix = [] | |
| cur_sentence_vectors = self.nnEm(curX) # output: [seq,features] | |
| if self.rnn_type =='LSTM':# need h_end,c_end | |
| h_end = hend[0].permute(1, 0, 2).contiguous().view(batch_size, self.num_rnn_layers,-1) | |
| c_end = hend[1].permute(1, 0, 2).contiguous().view(batch_size, self.num_rnn_layers,-1) | |
| curh0 = h_end[i].unsqueeze(0).permute(1, 0, 2) | |
| curc0 = c_end[i].unsqueeze(0).permute(1, 0, 2) | |
| h_pass = (curh0,curc0) | |
| else: # only need h_end | |
| h_end = hend.permute(1, 0, 2).contiguous().view(batch_size, self.num_rnn_layers,-1) | |
| curh0 = h_end[i].unsqueeze(0).permute(1, 0, 2) | |
| h_pass = curh0 | |
| curencoder_hn = hn[i, 0:curL, :] # hn[batch,seq,H] --> [seq,H] i is loop batch size | |
| Not_break = True | |
| loop_in = cur_sentence_vectors[0,:].unsqueeze(0).unsqueeze(0) #[1,1,H] | |
| loop_hc = h_pass | |
| loopstart =0 | |
| loop_j =0 | |
| while (Not_break): #if not end | |
| loop_o, loop_hc = self.decoder_rnn(loop_in,loop_hc) | |
| #TODO: make it point backward | |
| predict_range = list(range(loopstart,curL)) | |
| curencoder_hn_back = curencoder_hn[predict_range,:] | |
| cur_logists, cur_weights = self.pointerLayer(curencoder_hn_back, loop_o.squeeze(0).squeeze(0)) | |
| cur_align_vector = np.zeros(curL) | |
| cur_align_vector[predict_range]=cur_weights.data.cpu().numpy()[0] | |
| cur_align_matrix.append(cur_align_vector) | |
| #TODO:align groundtruth | |
| if loop_j > len(curY_index)-1: | |
| cur_groundy = curY_index[-1] | |
| else: | |
| cur_groundy = curY_index[loop_j] | |
| cur_groundy_var = Variable(torch.LongTensor([max(0,int(cur_groundy) - loopstart)])) | |
| #if self.use_cuda: | |
| # cur_groundy_var = cur_groundy_var.cuda() | |
| batch_loss = batch_loss + loss_function(cur_logists, cur_groundy_var) | |
| #TODO: get predicted boundary | |
| topv, topi = cur_logists.data.topk(1) | |
| pred_index = topi[0][0] | |
| #TODO: align pred_index to original seq | |
| ori_pred_index =pred_index + loopstart | |
| if cur_end_boundary == ori_pred_index: | |
| cur_boundary.append(ori_pred_index) | |
| cur_b_start.append(loopstart) | |
| Not_break = False | |
| loop_j = loop_j + 1 | |
| LoopN = LoopN + 1 | |
| break | |
| else: | |
| cur_boundary.append(ori_pred_index) | |
| loop_in = cur_sentence_vectors[ori_pred_index+1,:].unsqueeze(0).unsqueeze(0) | |
| cur_b_start.append(loopstart) | |
| loopstart = ori_pred_index+1 # start = pred_end + 1 | |
| loop_j = loop_j + 1 | |
| LoopN = LoopN + 1 | |
| #For each instance in batch | |
| batch_boundary.append(cur_boundary) | |
| batch_boundary_start.append(cur_b_start) | |
| batch_align_matrix.append(cur_align_matrix) | |
| batch_loss = batch_loss / LoopN | |
| batch_boundary=np.array(batch_boundary) | |
| batch_boundary_start = np.array(batch_boundary_start) | |
| batch_align_matrix = np.array(batch_align_matrix) | |
| return batch_loss,batch_boundary,batch_boundary_start,batch_align_matrix | |
| def predict(self,Xin,index_decoder_y,lens): | |
| batch_size = index_decoder_y.shape[0] | |
| encoder_hn, encoder_h_end = self.pointerEncoder(Xin, lens) | |
| batch_loss, batch_boundary, batch_boundary_start, batch_align_matrix = self.test_decoder(encoder_hn,encoder_h_end,Xin,index_decoder_y,lens) | |
| return batch_loss,batch_boundary,batch_boundary_start,batch_align_matrix | |