|
|
|
|
|
|
|
|
import os
|
|
|
import re
|
|
|
import time
|
|
|
import pandas as pd
|
|
|
import torch
|
|
|
import numpy as np
|
|
|
import torch.nn as nn
|
|
|
from tqdm import tqdm
|
|
|
import torch.nn.functional as F
|
|
|
from sklearn.model_selection import train_test_split
|
|
|
from underthesea import word_tokenize
|
|
|
import pickle
|
|
|
from torch.utils.data import Dataset, DataLoader
|
|
|
from dataclasses import dataclass
|
|
|
|
|
|
|
|
|
os.system("wget https://huggingface.co/datasets/Libosa2707/vietnamese-poem/resolve/main/poems_dataset.csv")
|
|
|
os.system("wget https://huggingface.co/datasets/phamson02/vietnamese-poetry-corpus/resolve/main/poems_dataset.csv")
|
|
|
|
|
|
|
|
|
os.system("pip install underthesea")
|
|
|
|
|
|
|
|
|
def save_pkl(save_object, save_file):
|
|
|
with open(save_file, 'wb') as f:
|
|
|
pickle.dump(save_object, f, protocol=pickle.HIGHEST_PROTOCOL)
|
|
|
|
|
|
def load_pkl(load_file):
|
|
|
with open(load_file, 'rb') as f:
|
|
|
output = pickle.load(f)
|
|
|
return output
|
|
|
|
|
|
|
|
|
data = pd.read_csv("poems_dataset.csv")
|
|
|
|
|
|
|
|
|
print(data.head())
|
|
|
|
|
|
|
|
|
train_df, val_df = train_test_split(data, test_size=0.3, random_state=42)
|
|
|
|
|
|
|
|
|
train_documents = [doc for doc in train_df['content'].tolist()]
|
|
|
val_documents = [doc for doc in val_df['content'].tolist()]
|
|
|
|
|
|
|
|
|
class Vocabulary:
|
|
|
def __init__(self):
|
|
|
self.word2id = dict()
|
|
|
self.pad_id = 0
|
|
|
self.unk_id = 1
|
|
|
self.sos_id = 2
|
|
|
self.eos_id = 3
|
|
|
|
|
|
self.word2id['<pad>'] = self.pad_id
|
|
|
self.word2id['<unk>'] = self.unk_id
|
|
|
self.word2id['<s>'] = self.sos_id
|
|
|
self.word2id['</s>'] = self.eos_id
|
|
|
|
|
|
self.id2word = {v: k for k, v in self.word2id.items()}
|
|
|
|
|
|
def __getitem__(self, word):
|
|
|
return self.word2id.get(word, self.unk_id)
|
|
|
|
|
|
def __contains__(self, word):
|
|
|
return word in self.word2id
|
|
|
|
|
|
def __len__(self):
|
|
|
return len(self.word2id)
|
|
|
|
|
|
def lookup_tokens(self, word_indexes: list):
|
|
|
return [self.id2word[word_index] for word_index in word_indexes]
|
|
|
|
|
|
def add(self, word):
|
|
|
if word not in self:
|
|
|
word_index = self.word2id[word] = len(self.word2id)
|
|
|
self.id2word[word_index] = word
|
|
|
return word_index
|
|
|
else:
|
|
|
return self[word]
|
|
|
|
|
|
def corpus_to_tensor(self, corpus, is_tokenized=False):
|
|
|
if is_tokenized:
|
|
|
tokenized_corpus = corpus
|
|
|
else:
|
|
|
tokenized_corpus = self.tokenize_corpus(corpus)
|
|
|
indicies_corpus = list()
|
|
|
for document in tqdm(tokenized_corpus):
|
|
|
indicies_document = torch.tensor(list(map(lambda word: self[word], document)), dtype=torch.long)
|
|
|
indicies_corpus.append(indicies_document)
|
|
|
return indicies_corpus
|
|
|
|
|
|
def tensor_to_corpus(self, tensor):
|
|
|
corpus = list()
|
|
|
for indicies in tqdm(tensor):
|
|
|
document = list(map(lambda index: self.id2word[index.item()], indicies))
|
|
|
corpus.append(document)
|
|
|
return corpus
|
|
|
|
|
|
@staticmethod
|
|
|
def tokenize_corpus(corpus):
|
|
|
print("Tokenize the corpus...")
|
|
|
tokenized_corpus = list()
|
|
|
for document in tqdm(corpus):
|
|
|
tokenized_document = ['<s>'] + re.findall(r'(\w+|[^\w\s]|\S+|\n)', document) + ['</s>']
|
|
|
tokenized_corpus.append(tokenized_document)
|
|
|
return tokenized_corpus
|
|
|
|
|
|
@classmethod
|
|
|
def from_documents(cls, documents):
|
|
|
words = set(word for doc in documents for word in re.findall(r'\w+|\S|\n', doc))
|
|
|
vocab = cls()
|
|
|
for w in words:
|
|
|
vocab.add(w)
|
|
|
return vocab
|
|
|
|
|
|
@classmethod
|
|
|
def from_pretrained(cls, save_dir):
|
|
|
with open(os.path.join(save_dir, "vocab.pkl"), 'rb') as file:
|
|
|
pretrained_vocab = pickle.load(file)
|
|
|
return cls.init_vocab_from_pretrained(pretrained_vocab)
|
|
|
|
|
|
@staticmethod
|
|
|
def init_vocab_from_pretrained(pretrained_vocab):
|
|
|
vocab = Vocabulary()
|
|
|
vocab.word2id.update(pretrained_vocab)
|
|
|
vocab.id2word = {v: k for k, v in vocab.word2id.items()}
|
|
|
return vocab
|
|
|
|
|
|
def save_pretrained(self, save_dir):
|
|
|
os.makedirs(save_dir, exist_ok=True)
|
|
|
with open(os.path.join(save_dir, "vocab.pkl"), 'wb') as file:
|
|
|
pickle.dump(self.word2id, file)
|
|
|
|
|
|
|
|
|
vocab = Vocabulary.from_documents(train_documents)
|
|
|
|
|
|
|
|
|
class PoemGenerationDataset(Dataset):
|
|
|
def __init__(self, documents, vocab, max_length=None):
|
|
|
self.vocab = vocab
|
|
|
self.sos_idx = vocab["<s>"]
|
|
|
self.eos_idx = vocab["</s>"]
|
|
|
self.pad_idx = vocab["<pad>"]
|
|
|
self.documents = documents
|
|
|
self.max_length = max_length
|
|
|
self.tokenized_documents = self.vocab.tokenize_corpus(self.documents)
|
|
|
self.tensor_data = self.vocab.corpus_to_tensor(self.tokenized_documents, is_tokenized=True)
|
|
|
|
|
|
def __len__(self):
|
|
|
return len(self.tensor_data)
|
|
|
|
|
|
def __getitem__(self, idx):
|
|
|
return self.tensor_data[idx]
|
|
|
|
|
|
def shift_right(self, input_ids, pad_token=0):
|
|
|
padding_column = torch.full_like(input_ids[:, :1], pad_token)
|
|
|
shifted_ids = torch.cat([padding_column, input_ids[:, :-1]], dim=-1)
|
|
|
return shifted_ids
|
|
|
|
|
|
def collate_fn(self, examples):
|
|
|
examples = sorted(examples, key=lambda e: len(e), reverse=True)
|
|
|
if self.max_length is not None:
|
|
|
examples = [torch.cat([e[:self.max_length-1], torch.tensor([self.eos_idx])]) for e in examples]
|
|
|
docs = [e for e in examples]
|
|
|
input_ids = torch.nn.utils.rnn.pad_sequence(docs, batch_first=True, padding_value=self.pad_idx)
|
|
|
labels = input_ids.clone()
|
|
|
input_ids = self.shift_right(input_ids, pad_token=self.pad_idx)
|
|
|
return {"inputs": input_ids, "labels": labels}
|
|
|
|
|
|
|
|
|
train_dataset = PoemGenerationDataset(train_documents, vocab, max_length=512)
|
|
|
val_dataset = PoemGenerationDataset(val_documents, vocab, max_length=512)
|
|
|
train_dataloader = DataLoader(train_dataset, batch_size=5, collate_fn=train_dataset.collate_fn)
|
|
|
val_dataloader = DataLoader(val_dataset, batch_size=5, collate_fn=val_dataset.collate_fn)
|
|
|
|
|
|
|
|
|
class LSTMForPoemGeneration(nn.Module):
|
|
|
def __init__(self, config):
|
|
|
super().__init__()
|
|
|
self.config = config
|
|
|
self.embedding = nn.Embedding(config.vocab_size, config.hidden_size, padding_idx=config.padding_idx)
|
|
|
self.batch_norm = nn.BatchNorm1d(config.hidden_size)
|
|
|
self.lstm = nn.LSTM(config.hidden_size,
|
|
|
config.hidden_size,
|
|
|
num_layers=config.num_layers,
|
|
|
dropout=config.dropout,
|
|
|
batch_first=True)
|
|
|
self.lm_head = nn.Linear(config.hidden_size, config.vocab_size)
|
|
|
self.dropout = nn.Dropout(config.dropout)
|
|
|
|
|
|
def forward(self, input_ids, labels=None):
|
|
|
embeds = self.dropout(self.embedding(input_ids))
|
|
|
bn_output = self.batch_norm(embeds.permute(0,2,1)).permute(0,2,1)
|
|
|
output, (hidden, cell) = self.lstm(bn_output)
|
|
|
logits = self.lm_head(self.dropout(output))
|
|
|
loss = None
|
|
|
if labels is not None:
|
|
|
loss_fct = nn.CrossEntropyLoss()
|
|
|
labels = labels.to(logits.device)
|
|
|
loss = loss_fct(logits.view(-1, logits.size(2)), labels.view(-1))
|
|
|
return logits, loss
|
|
|
|
|
|
@torch.no_grad()
|
|
|
def generate(self, input_ids, max_length=100, temperature=1.0):
|
|
|
self.to(input_ids.device)
|
|
|
self.eval()
|
|
|
current_length = input_ids.size(1)
|
|
|
while current_length < max_length:
|
|
|
logits, _ = self.forward(input_ids)
|
|
|
logits = logits[:, -1, :] / temperature
|
|
|
probs = F.softmax(logits, dim=-1)
|
|
|
next_token_id = torch.multinomial(probs, num_samples=1)
|
|
|
input_ids = torch.cat([input_ids, next_token_id], dim=1)
|
|
|
current_length += 1
|
|
|
if next_token_id.item() == vocab.eos_id:
|
|
|
break
|
|
|
return input_ids
|
|
|
|
|
|
def save_pretrained(self, save_dir):
|
|
|
os.makedirs(save_dir, exist_ok=True)
|
|
|
torch.save(self.config, os.path.join(save_dir, "config.pt"))
|
|
|
torch.save(self.state_dict(), os.path.join(save_dir, "pytorch_model.pt"))
|
|
|
|
|
|
@classmethod
|
|
|
def from_pretrained(cls, saved_dir):
|
|
|
config = torch.load(os.path.join(saved_dir, "config.pt"))
|
|
|
model = cls(config)
|
|
|
state_dict = torch.load(os.path.join(saved_dir, "pytorch_model.pt"), map_location=torch.device('cpu'))
|
|
|
model.load_state_dict(state_dict)
|
|
|
return model
|
|
|
|
|
|
|
|
|
@dataclass
|
|
|
class LSTMConfig:
|
|
|
vocab_size: int
|
|
|
padding_idx: int
|
|
|
hidden_size: int
|
|
|
num_layers: int
|
|
|
dropout: float = 0.3
|
|
|
|
|
|
|
|
|
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
|
|
|
|
|
|
model_config = LSTMConfig(vocab_size=len(vocab), padding_idx=vocab.pad_id, hidden_size=128, num_layers=2)
|
|
|
model = LSTMForPoemGeneration(model_config).to(device)
|
|
|
|
|
|
optimizer = torch.optim.AdamW(model.parameters(), lr=3e-4)
|
|
|
|
|
|
for epoch in range(20):
|
|
|
model.train()
|
|
|
running_loss = 0.0
|
|
|
for i, batch in enumerate(tqdm(train_dataloader), 1):
|
|
|
optimizer.zero_grad()
|
|
|
inputs, labels = batch["inputs"].to(device), batch["labels"].to(device)
|
|
|
_, loss = model(inputs, labels)
|
|
|
loss.backward()
|
|
|
optimizer.step()
|
|
|
running_loss += loss.item()
|
|
|
print(f"Training Loss after epoch {epoch}: {running_loss/len(train_dataloader)}")
|
|
|
|
|
|
|
|
|
model.eval()
|
|
|
val_loss = 0.0
|
|
|
with torch.no_grad():
|
|
|
for i, batch in enumerate(tqdm(val_dataloader), 1):
|
|
|
inputs, labels = batch["inputs"].to(device), batch["labels"].to(device)
|
|
|
_, loss = model(inputs, labels)
|
|
|
val_loss += loss.item()
|
|
|
print(f"Validation Loss after epoch {epoch}: {val_loss/len(val_dataloader)}")
|
|
|
|
|
|
|
|
|
model.save_pretrained("poem_model")
|
|
|
vocab.save_pretrained("poem_model")
|
|
|
|
|
|
|
|
|
input_sentence = "Mặt trời mọc ở đằng đông"
|
|
|
input_ids = vocab.corpus_to_tensor([input_sentence])[0].unsqueeze(0).to(device)
|
|
|
output = model.generate(input_ids, max_length=100, temperature=0.8)
|
|
|
output_text = " ".join(vocab.lookup_tokens(output.squeeze().tolist())).replace("<s>", "").replace("</s>", "")
|
|
|
print(output_text)
|
|
|
|