| import pandas as pd |
| import numpy as np |
| import torch |
| import torch.optim as optim |
| from torch.utils.data import Dataset, DataLoader, RandomSampler, SequentialSampler |
| from torch import cuda |
| from transformers import GPT2LMHeadModel,GPT2Tokenizer, GPT2Config |
| import argparse |
|
|
|
|
| |
| |
|
|
| device = 'mps' if torch.backends.mps.is_available() else 'cpu' |
|
|
| |
|
|
| ''' |
| from datasets import load_dataset |
| dataset1 = load_dataset("dair-ai/emotion") |
| for split, data in dataset1.items(): |
| data.to_csv(f"emotion_{split}.csv", index = None) |
| ''' |
|
|
| def read_reviews(data_path): |
| dataset = pd.DataFrame() |
| for path in data_path: |
| df = pd.read_csv("/content/drive/MyDrive/Text_summary_datasets/"+ path) |
| |
| df.dropna(inplace=True) |
| |
| if path == "emotion_train.csv": |
| class_mapping = {0:'sad', 1: 'joy', 2: 'love', 3: 'anger', 4: 'fear', 5: 'surprise'} |
| |
| df['Summary'] = df['label'].replace(class_mapping) |
| df['training'] = df['text'] + 'TL;DR' + df['Summary'] |
| df['Text'] = df['text'] |
| if path == "amazon_review.csv": |
| df['training'] = df['Text'] + 'TL;DR' + df['Summary'] |
| if path == "kindle_review.csv": |
| df['training'] = df['reviewText'] + 'TL;DR' + df['summary'] |
| df['Text'] = df['reviewText'] |
| df['Summary'] = df['summary'] |
| if path == "tweet_train.csv": |
| df['training'] = df['content'] + 'TL;DR' + df['c_summary'] |
| df['Text'] = df['content'] |
| df['Summary'] = df['c_summary'] |
|
|
| sampled_data = df.sample(n=1250, random_state=42) |
| dataset = dataset.append(sampled_data, ignore_index=True) |
|
|
| |
| |
| dataset = dataset[['Summary','Text','training']] |
| return dataset |
|
|
|
|
|
|
| |
|
|
| class GPT2ReviewDataset(Dataset): |
| def __init__(self, tokenizer, reviews, max_len): |
| self.max_len = max_len |
| self.tokenizer = tokenizer |
| self.eos = self.tokenizer.eos_token |
| self.eos_id = self.tokenizer.eos_token_id |
| self.reviews = reviews |
| self.result = [] |
|
|
| for review in self.reviews: |
| |
| tokenized = self.tokenizer.encode(review + self.eos, max_length = 512, truncation = True) |
|
|
| |
| padded = self.pad_truncate(tokenized) |
|
|
| |
| self.result.append(torch.tensor(padded)) |
|
|
| def __len__(self): |
| return len(self.result) |
|
|
|
|
| def __getitem__(self, item): |
| return self.result[item] |
|
|
| def pad_truncate(self, name): |
| extra_length = 4 |
| name_length = len(name) - extra_length |
| if name_length < self.max_len: |
| difference = self.max_len - name_length |
| result = name + [self.eos_id] * difference |
| elif name_length > self.max_len: |
| result = name[:self.max_len + 3]+[self.eos_id] |
| else: |
| result = name |
| return result |
|
|
| def train(model, optimizer, dl, epochs): |
| for epoch in range(epochs): |
| for idx, batch in enumerate(dl): |
| print(idx) |
| with torch.set_grad_enabled(True): |
| optimizer.zero_grad() |
| batch = batch.to(device) |
| output = model(batch, labels=batch) |
| loss = output[0] |
| loss.backward() |
| optimizer.step() |
| torch.save(model, '/content/drive/MyDrive/Text_summary_datasets/text_summary_4sets.pth') |
| if idx % 50 == 0: |
| print("loss: %f, %d"%(loss, idx)) |
| |
| def main(): |
| data_path = ["emotion_train.csv","kindle_review.csv", "amazon_review.csv", "tweet_train.csv"] |
| reviews = read_reviews(data_path) |
|
|
| model = GPT2LMHeadModel.from_pretrained('gpt2') |
| |
| config = GPT2Config.from_pretrained("gpt2") |
| model.config = config |
|
|
| tokenizer = GPT2Tokenizer.from_pretrained("gpt2") |
| extra_length = len(tokenizer.encode(" TL;DR ")) |
| max_length = 250 |
| optimizer = optim.Adam(params = model.parameters(), lr=3e-4) |
|
|
| dataset = GPT2ReviewDataset(tokenizer, reviews['training'], max_len = max_length) |
| dataloader = DataLoader(dataset, batch_size=8, shuffle=True) |
|
|
| train(model=model, optimizer=optimizer, dl=dataloader, epochs=3) |
|
|
| torch.save(model, '/content/drive/MyDrive/Text_summary_datasets/text_summary_4sets.pth') |
|
|
|
|
| def topk(probs, n=9): |
| |
| probs = torch.softmax(probs, dim= -1) |
|
|
| |
| tokensProb, topIx = torch.topk(probs, k=n) |
|
|
| |
| tokensProb = tokensProb / torch.sum(tokensProb) |
|
|
| |
| tokensProb = tokensProb.cpu().detach().numpy() |
|
|
| |
| choice = np.random.choice(n, 1, p = tokensProb) |
| tokenId = topIx[choice][0] |
|
|
| return int(tokenId) |
|
|
| def model_infer(model, tokenizer, review, max_length=30): |
| |
| review_encoded = tokenizer.encode(review) |
| result = review_encoded |
| initial_input = torch.tensor(review_encoded).unsqueeze(0).to(device) |
|
|
| with torch.set_grad_enabled(False): |
| |
| output = model(initial_input) |
|
|
| |
| logits = output.logits[0,-1] |
|
|
| |
| |
| choices = topk(logits) |
| result.append(choices) |
| |
| |
| for _ in range(max_length): |
| |
| input = torch.tensor(result).unsqueeze(0).to(device) |
| output = model(input) |
| logits = output.logits[0,-1] |
| res_id = topk(logits) |
|
|
| |
| if res_id == tokenizer.eos_token_id: |
| return tokenizer.decode(result) |
| else: |
| result.append(res_id) |
|
|
| |
| return tokenizer.decode(result) |
| |
| def interface(input): |
| dataset_sample = False |
| tokenizer = GPT2Tokenizer.from_pretrained("gpt2") |
| model = torch.load('text_summary_4sets_2_550.pth', map_location=torch.device('mps')) |
| if dataset_sample: |
| sample_reviews = reviews['training'].sample(n=1, random_state=1) |
| summary = [model_infer(model, tokenizer, review).strip() for review in sample_reviews] |
| |
| else: |
| result_text = [] |
| for i in range(6): |
| summary = model_infer(model, tokenizer, input+"TL;DR").strip() |
| result_text.append(summary[len(input)+5:]) |
| |
| print("summary:", sorted(result_text, key=len)[3]) |
| |
| ''' |
| |
| |
| sample = 'Today was a hard day. I woke up feeling anxious and stressed about a meeting I had at work. The meeting did not go as I had hoped and I left disappointed. I tried to focus on other things and stay positive, but it was hard. I spent most of the evening starving and eating junk food. Not the best way to deal with my emotions, but it’s something I’m working on. Hope tomorrow will be a better day.TL;DR' |
| |
| summary = model_infer(model, tokenizer, sample).strip() |
| |
| sample |
| |
| summary[len(sample):] |
| |
| sample = 'Today was much better than yesterday. I wake up feeling more rested and ready to tackle the day. I had a productive day at work and even managed to finish a project I was struggling with. After work, I met some friends for a yoga class and it was just what I needed to relax and unwind. We went out for dinner afterwards and had a really nice time. Overall, it was a much better day than yesterday and I feel more positive about things.TL;DR' |
| |
| summary = model_infer(model, tokenizer, sample).strip() |
| |
| summary[len(sample):] |
| |
| sample = 'Today was a beautiful day. I had a good night’s sleep and was ready to start the day. I went to work and had a productive morning. I even managed to finish a project I’d been working on for weeks. After work, I ran to clear my head. It was a beautiful day and the weather was perfect for it. I came home and cooked dinner with my partner. We had a nice conversation over dinner and then spent the evening watching a movie. Overall, it was a pretty relaxing and enjoyable day.' |
| |
| summary = model_infer(model, tokenizer, sample + 'TL;DR').strip() |
| summary[len(sample)+5:] |
| ''' |
|
|
| if __name__ == '__main__': |
| parser = argparse.ArgumentParser(description= "parser") |
|
|
| |
| parser.add_argument("--train", action="store_true", help="Train the model") |
| parser.add_argument("--infer", type=str, help="Interact with the model") |
|
|
| |
| args = parser.parse_args() |
|
|
| |
| if args.train: |
| main() |
| elif args.infer: |
| interface(args.infer) |
| else: |
| print("No valid option provided. Use --train or --infer.") |
|
|