| import warnings
|
| warnings.filterwarnings("ignore")
|
|
|
| from sklearn.metrics import accuracy_score,f1_score
|
| from datasets import load_dataset, load_from_disk, Dataset
|
| from tqdm import tqdm
|
| import datasets
|
| import torch
|
|
|
| from torch.utils.data import DataLoader
|
| from functools import partial
|
| from pathlib import Path
|
|
|
| dic = {
|
| 0:"negative",
|
| 1:'neutral',
|
| 2:'positive',
|
| }
|
|
|
| with open(Path(__file__).parent / 'sentiment_templates.txt') as f:
|
| templates = [l.strip() for l in f.readlines()]
|
|
|
|
|
| def format_example(example: dict) -> dict:
|
| context = f"Instruction: {example['instruction']}\n"
|
| if example.get("input"):
|
| context += f"Input: {example['input']}\n"
|
| context += "Answer: "
|
| target = example["output"]
|
| return {"context": context, "target": target}
|
|
|
| def change_target(x):
|
| if 'positive' in x or 'Positive' in x:
|
| return 'positive'
|
| elif 'negative' in x or 'Negative' in x:
|
| return 'negative'
|
| else:
|
| return 'neutral'
|
|
|
|
|
| def vote_output(x):
|
| output_dict = {'positive': 0, 'negative': 0, 'neutral': 0}
|
| for i in range(len(templates)):
|
| pred = change_target(x[f'out_text_{i}'].lower())
|
| output_dict[pred] += 1
|
| if output_dict['positive'] > output_dict['negative']:
|
| return 'positive'
|
| elif output_dict['negative'] > output_dict['positive']:
|
| return 'negative'
|
| else:
|
| return 'neutral'
|
|
|
| def test_fpb(args, model, tokenizer, prompt_fun=None):
|
| batch_size = args.batch_size
|
|
|
| instructions = load_from_disk(Path(__file__).parent.parent / "data/financial_phrasebank-sentences_50agree/")
|
| instructions = instructions["train"]
|
| instructions = instructions.train_test_split(seed = 42)['test']
|
| instructions = instructions.to_pandas()
|
| instructions.columns = ["input", "output"]
|
| instructions["output"] = instructions["output"].apply(lambda x:dic[x])
|
|
|
| if prompt_fun is None:
|
| instructions["instruction"] = "What is the sentiment of this news? Please choose an answer from {negative/neutral/positive}."
|
| else:
|
| instructions["instruction"] = instructions.apply(prompt_fun, axis = 1)
|
|
|
| instructions[["context","target"]] = instructions.apply(format_example, axis = 1, result_type="expand")
|
|
|
|
|
| print(f"\n\nPrompt example:\n{instructions['context'][0]}\n\n")
|
|
|
|
|
| context = instructions['context'].tolist()
|
|
|
| total_steps = instructions.shape[0]//batch_size + 1
|
| print(f"Total len: {len(context)}. Batchsize: {batch_size}. Total steps: {total_steps}")
|
|
|
|
|
| out_text_list = []
|
| for i in tqdm(range(total_steps)):
|
| tmp_context = context[i* batch_size:(i+1)* batch_size]
|
| tokens = tokenizer(tmp_context, return_tensors='pt', padding=True, max_length=512, return_token_type_ids=False)
|
| for k in tokens.keys():
|
| tokens[k] = tokens[k].cuda()
|
| res = model.generate(**tokens, max_length=512, eos_token_id=tokenizer.eos_token_id)
|
| res_sentences = [tokenizer.decode(i, skip_special_tokens=True) for i in res]
|
|
|
| out_text = [o.split("Answer: ")[1] for o in res_sentences]
|
| out_text_list += out_text
|
| torch.cuda.empty_cache()
|
|
|
| instructions["out_text"] = out_text_list
|
| instructions["new_target"] = instructions["target"].apply(change_target)
|
| instructions["new_out"] = instructions["out_text"].apply(change_target)
|
|
|
| acc = accuracy_score(instructions["new_target"], instructions["new_out"])
|
| f1_macro = f1_score(instructions["new_target"], instructions["new_out"], average = "macro")
|
| f1_micro = f1_score(instructions["new_target"], instructions["new_out"], average = "micro")
|
| f1_weighted = f1_score(instructions["new_target"], instructions["new_out"], average = "weighted")
|
|
|
| print(f"Acc: {acc}. F1 macro: {f1_macro}. F1 micro: {f1_micro}. F1 weighted (BloombergGPT): {f1_weighted}. ")
|
|
|
| return instructions
|
|
|
|
|
| def test_fpb_mlt(args, model, tokenizer):
|
| batch_size = args.batch_size
|
|
|
| dataset = load_from_disk(Path(__file__).parent.parent / 'data/financial_phrasebank-sentences_50agree/')
|
| dataset = dataset["train"]
|
| dataset = dataset.train_test_split(seed=42)['test']
|
| dataset = dataset.to_pandas()
|
| dataset.columns = ["input", "output"]
|
| dataset["output"] = dataset["output"].apply(lambda x: dic[x])
|
| dataset["text_type"] = dataset.apply(lambda x: 'news', axis=1)
|
|
|
| dataset["output"] = dataset["output"].apply(change_target)
|
| dataset = dataset[dataset["output"] != 'neutral']
|
|
|
| out_texts_list = [[] for _ in range(len(templates))]
|
|
|
| def collate_fn(batch):
|
| inputs = tokenizer(
|
| [f["context"] for f in batch], return_tensors='pt',
|
| padding=True, max_length=args.max_length,
|
| return_token_type_ids=False
|
| )
|
| return inputs
|
|
|
| for i, template in enumerate(templates):
|
| dataset = dataset[['input', 'output', "text_type"]]
|
| dataset["instruction"] = dataset['text_type'].apply(lambda x: template.format(type=x) + "\nOptions: positive, negative")
|
|
|
| dataset[["context", "target"]] = dataset.apply(format_example, axis=1, result_type="expand")
|
|
|
| dataloader = DataLoader(Dataset.from_pandas(dataset), batch_size=args.batch_size, collate_fn=collate_fn, shuffle=False)
|
|
|
| log_interval = len(dataloader) // 5
|
|
|
| for idx, inputs in enumerate(tqdm(dataloader)):
|
| inputs = {key: value.to(model.device) for key, value in inputs.items()}
|
| res = model.generate(**inputs, do_sample=False, max_length=args.max_length, eos_token_id=tokenizer.eos_token_id, max_new_tokens=10)
|
| res_sentences = [tokenizer.decode(i, skip_special_tokens=True) for i in res]
|
| tqdm.write(f'{idx}: {res_sentences[0]}')
|
|
|
|
|
| out_text = [o.split("Answer: ")[1] for o in res_sentences]
|
| out_texts_list[i] += out_text
|
| torch.cuda.empty_cache()
|
|
|
| for i in range(len(templates)):
|
| dataset[f"out_text_{i}"] = out_texts_list[i]
|
| dataset[f"out_text_{i}"] = dataset[f"out_text_{i}"].apply(change_target)
|
|
|
| dataset["new_out"] = dataset.apply(vote_output, axis=1, result_type="expand")
|
| dataset.to_csv('tmp.csv')
|
|
|
| for k in [f"out_text_{i}" for i in range(len(templates))] + ["new_out"]:
|
|
|
| acc = accuracy_score(dataset["target"], dataset[k])
|
| f1_macro = f1_score(dataset["target"], dataset[k], average="macro")
|
| f1_micro = f1_score(dataset["target"], dataset[k], average="micro")
|
| f1_weighted = f1_score(dataset["target"], dataset[k], average="weighted")
|
|
|
| print(f"Acc: {acc}. F1 macro: {f1_macro}. F1 micro: {f1_micro}. F1 weighted (BloombergGPT): {f1_weighted}. ")
|
|
|
| return dataset |