vlm_clone_2 / llm2vec /experiments /test_word_task.py
tuandunghcmut's picture
Add files using upload-large-folder tool
e9cd0c7 verified
import os
import sys
import logging
import argparse
from transformers import (
AutoTokenizer,
AutoConfig,
AutoModelForTokenClassification,
set_seed,
HfArgumentParser,
)
import torch
from datasets import load_dataset
import evaluate
import json
from tqdm import tqdm
from run_word_task import ModelForWordTask
from llm2vec import LLM2Vec
LABELS = {
"conll2003": {
"pos_tags": {
'"': 0,
"''": 1,
"#": 2,
"$": 3,
"(": 4,
")": 5,
",": 6,
".": 7,
":": 8,
"``": 9,
"CC": 10,
"CD": 11,
"DT": 12,
"EX": 13,
"FW": 14,
"IN": 15,
"JJ": 16,
"JJR": 17,
"JJS": 18,
"LS": 19,
"MD": 20,
"NN": 21,
"NNP": 22,
"NNPS": 23,
"NNS": 24,
"NN|SYM": 25,
"PDT": 26,
"POS": 27,
"PRP": 28,
"PRP$": 29,
"RB": 30,
"RBR": 31,
"RBS": 32,
"RP": 33,
"SYM": 34,
"TO": 35,
"UH": 36,
"VB": 37,
"VBD": 38,
"VBG": 39,
"VBN": 40,
"VBP": 41,
"VBZ": 42,
"WDT": 43,
"WP": 44,
"WP$": 45,
"WRB": 46,
},
"chunk_tags": {
"O": 0,
"B-ADJP": 1,
"I-ADJP": 2,
"B-ADVP": 3,
"I-ADVP": 4,
"B-CONJP": 5,
"I-CONJP": 6,
"B-INTJ": 7,
"I-INTJ": 8,
"B-LST": 9,
"I-LST": 10,
"B-NP": 11,
"I-NP": 12,
"B-PP": 13,
"I-PP": 14,
"B-PRT": 15,
"I-PRT": 16,
"B-SBAR": 17,
"I-SBAR": 18,
"B-UCP": 19,
"I-UCP": 20,
"B-VP": 21,
"I-VP": 22,
},
"ner_tags": {
"O": 0,
"B-PER": 1,
"I-PER": 2,
"B-ORG": 3,
"I-ORG": 4,
"B-LOC": 5,
"I-LOC": 6,
"B-MISC": 7,
"I-MISC": 8,
},
}
}
def str2bool(v):
if isinstance(v, bool):
return v
if v.lower() in ("yes", "true", "t", "y", "1"):
return True
elif v.lower() in ("no", "false", "f", "n", "0"):
return False
else:
raise argparse.ArgumentTypeError("Boolean value expected.")
if __name__ == "__main__":
logging.basicConfig(level=logging.INFO)
parser = argparse.ArgumentParser()
parser.add_argument("--model_class", default="custom", type=str)
parser.add_argument("--model_name_or_path", default=None, type=str)
parser.add_argument(
"--peft_addr",
default=None,
type=str,
help="The dir address where adapter_model.bin is saved.",
)
parser.add_argument(
"--cls_addr",
default=None,
type=str,
help="The dir address where classifier is saved.",
)
parser.add_argument("--bidirectional", default=True, type=str2bool)
parser.add_argument("--merge_subwords", default=True, type=str2bool)
parser.add_argument("--output_dir", default=None, type=str)
parser.add_argument("--classifier_dropout", default=0.1, type=float)
parser.add_argument(
"--attn_implementation",
default="sdpa",
type=str,
choices=["sdpa", "eager", "flash_attention_2"],
)
parser.add_argument(
"--torch_dtype",
default=None,
type=str,
choices=["auto", "bfloat16", "float16", "float32"],
)
parser.add_argument(
"--retroactive_labels",
default="next_token",
type=str,
choices=["next_token", "same_token"],
)
parser.add_argument("--dataset_name", default=None, type=str)
parser.add_argument(
"--task", default=None, type=str, choices=["pos_tags", "chunk_tags", "ner_tags"]
)
parser.add_argument("--max_seq_length", default=1024, type=int)
parser.add_argument("--batch_size", default=32, type=int)
parser.add_argument("--seed", default=32, type=int)
parser.add_argument("--config_file", default=None, type=str)
args = parser.parse_args()
if args.config_file is not None:
# If we pass only one argument to the script and it's the path to a json file,
# let's parse it to get our arguments.
from pathlib import Path
import json
json_text = json.load(open(os.path.abspath(args.config_file)))
argparse_dict = vars(args)
argparse_dict.update(json_text)
# args = parser.parse_json_file(json_file=os.path.abspath(sys.argv[1]))
else:
args = parser.parse_args()
path_to_check = args.peft_addr if args.peft_addr else args.model_name_or_path
assert (
args.output_dir is not None
), "If you want to evaluate a model, you have to provide the output_dir"
os.makedirs(args.output_dir, exist_ok=True)
set_seed(args.seed)
tokenizer_kwargs = {}
if "gpt" in args.model_name_or_path:
tokenizer_kwargs["add_prefix_space"] = True
tokenizer = AutoTokenizer.from_pretrained(
args.model_name_or_path, **tokenizer_kwargs
)
if tokenizer.pad_token_id is None:
tokenizer.pad_token = tokenizer.eos_token
if args.model_class == "custom":
tokenizer.model_input_names.append("token_type_ids")
if args.model_class == "auto":
assert not args.merge_subwords
assert (
args.dataset_name in LABELS and args.task in LABELS[args.dataset_name]
), f"LABELS[{args.dataset_name}][{args.task}] is not defined."
config_kwargs = {
"num_labels": len(LABELS[args.dataset_name][args.task]),
"id2label": {
i: lab for (lab, i) in LABELS[args.dataset_name][args.task].items()
},
"label2id": LABELS[args.dataset_name][args.task],
"classifier_dropout": args.classifier_dropout,
}
if args.model_class == "custom":
if args.model_name_or_path:
config = AutoConfig.from_pretrained(
args.model_name_or_path, **config_kwargs
)
else:
raise ValueError("Invalid config loading")
for k, v in config_kwargs.items():
config.__setattr__(k, v)
torch_dtype = (
args.torch_dtype
if args.torch_dtype in ["auto", None]
else getattr(torch, args.torch_dtype)
)
l2v = LLM2Vec.from_pretrained(
base_model_name_or_path=args.model_name_or_path,
enable_bidirectional=args.bidirectional,
peft_model_name_or_path=args.peft_addr,
merge_peft=False,
torch_dtype=torch_dtype,
attn_implementation=args.attn_implementation,
)
model = ModelForWordTask(
model=l2v.model,
merge_subwords=args.merge_subwords,
config=config,
torch_dtype=torch_dtype,
)
classifier_path = os.path.join(args.cls_addr, "classifier.pt")
if os.path.exists(classifier_path):
print(f"Loading classifier from {classifier_path}")
model.classifier = torch.load(classifier_path)
else:
raise ValueError("classifier does not exist in", classifier_path)
elif args.model_class == "auto":
model = AutoModelForTokenClassification.from_pretrained(
args.model_name_or_path,
num_labels=len(LABELS[args.dataset_name][args.task]),
id2label={
i: lab for (lab, i) in LABELS[args.dataset_name][args.task].items()
},
label2id=LABELS[args.dataset_name][args.task],
)
else:
raise ValueError(
f"{args.model_class} is not implemented. Only auto and custom model_class options are valid."
)
model = model.cuda()
raw_datasets = load_dataset(args.dataset_name, split="test")
def tokenize_and_align_labels(examples):
task = args.task
tokenized_inputs = tokenizer(
examples["tokens"],
truncation=True,
is_split_into_words=True,
padding="max_length",
max_length=args.max_seq_length,
return_tensors="pt",
)
labels = []
words = []
for i, label in enumerate(examples[task]):
if args.retroactive_labels in ["same_token"]:
# if args.retroactive_labels == "next_word":
# label = label[1:] + [-100]
word_ids = tokenized_inputs.word_ids(batch_index=i)
previous_word_idx = None
label_ids = []
for word_idx in word_ids:
if word_idx is None:
label_ids.append(-100)
elif word_idx != previous_word_idx:
label_ids.append(label[word_idx])
else:
label_ids.append(-100)
previous_word_idx = word_idx
labels.append(label_ids)
word_ids = [-1 if w is None else w for w in word_ids]
words.append(word_ids)
elif args.retroactive_labels == "next_token":
word_ids = tokenized_inputs.word_ids(batch_index=i)
previous_word_idx = None
label_ids = []
for word_idx in word_ids:
if word_idx is None:
label_ids.append(-100)
elif word_idx != previous_word_idx:
label_ids.append(label[word_idx])
else:
label_ids.append(-100)
previous_word_idx = word_idx
label_ids.append(-100)
labels.append(label_ids[1:])
word_ids = word_ids[1:] + [None]
word_ids = [-1 if w is None else w for w in word_ids]
words.append(word_ids)
else:
raise ValueError(
f"retroactive_labels {args.retroactive_labels} is not implemented."
)
tokenized_inputs["labels"] = torch.tensor(labels)
if args.model_class == "custom":
tokenized_inputs["token_type_ids"] = words
return tokenized_inputs
tokenized_dataset = raw_datasets.map(
tokenize_and_align_labels,
batched=True,
remove_columns=list(LABELS[args.dataset_name].keys()) + ["tokens", "id"],
)
with torch.no_grad():
predictions = None
labels = None
for batch_begin in tqdm(
torch.arange(0, len(tokenized_dataset), args.batch_size)
):
features = {
"input_ids": torch.tensor(
tokenized_dataset[batch_begin : batch_begin + args.batch_size][
"input_ids"
]
).to(model.device),
"attention_mask": torch.tensor(
tokenized_dataset[batch_begin : batch_begin + args.batch_size][
"attention_mask"
]
).to(model.device),
}
if (
"token_type_ids"
in tokenized_dataset[batch_begin : batch_begin + args.batch_size]
):
features["token_type_ids"] = torch.tensor(
tokenized_dataset[batch_begin : batch_begin + args.batch_size][
"token_type_ids"
]
).to(model.device)
labs = torch.tensor(
tokenized_dataset[batch_begin : batch_begin + args.batch_size]["labels"]
)
logits = model(**features).logits
preds = torch.argmax(logits, dim=-1)
if predictions is None:
predictions = preds
labels = labs
else:
predictions = torch.concatenate((predictions, preds))
labels = torch.concatenate((labels, labs))
precision_metric = evaluate.load("precision")
metrics = precision_metric.compute(
references=labels[labels != -100],
predictions=predictions[labels != -100],
average="micro",
)
with open(os.path.join(args.output_dir, "result_summary.json"), "w") as f:
json.dump(metrics, f)
print(metrics)