File size: 7,408 Bytes
ecadbd9 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 | ## FB 124M
import torch
from transformers import AutoTokenizer, AutoModelForCausalLM
from datasets import load_dataset
from tqdm import tqdm
import re
import string
import collections
import numpy as np
import json
from .config import MainConfig, convert_to_trainer_args
from smpeft.sama import SamaConfig #RotationTuner
from smpeft import get_peft_model, PeftModel
import draccus
import random
import transformers
BATCH_SIZE = 32
IGNORE_INDEX=-100
MAX_NEW_TOKENS = 50
PROMPT_TEMPLATE = (
"Below is an passage followed by a coresponding question that describes a task "
"Write a response that appropriately completes the request with your answer.\n\n"
"### Instruction:\n{instruction}\n\n### Response:"
)
def normalize_answer(s):
"""Lower text and remove punctuation, articles and extra whitespace."""
def remove_articles(text):
regex = re.compile(r'\b(a|an|the)\b', re.UNICODE)
return re.sub(regex, ' ', text)
def white_space_fix(text):
return ' '.join(text.split())
def remove_punc(text):
exclude = set(string.punctuation)
return ''.join(ch for ch in text if ch not in exclude)
def lower(text):
return text.lower()
return white_space_fix(remove_articles(remove_punc(lower(s))))
def f1_score(prediction, ground_truth):
prediction_tokens = normalize_answer(prediction).split()
ground_truth_tokens = normalize_answer(ground_truth).split()
common = collections.Counter(prediction_tokens) & collections.Counter(ground_truth_tokens)
num_same = sum(common.values())
if num_same == 0:
return 0
precision = 1.0 * num_same / len(prediction_tokens)
recall = 1.0 * num_same / len(ground_truth_tokens)
f1 = (2 * precision * recall) / (precision + recall)
return f1
def exact_match_score(prediction, ground_truth):
return (normalize_answer(prediction) == normalize_answer(ground_truth))
def metric_max_over_ground_truths(metric_fn, prediction, ground_truths):
"""
DROP often has multiple valid answer spans.
We take the max score among all valid ground truths.
"""
scores_for_ground_truths = []
for ground_truth in ground_truths:
score = metric_fn(prediction, ground_truth)
scores_for_ground_truths.append(score)
return max(scores_for_ground_truths)
def set_seed(seed: int):
random.seed(seed)
np.random.seed(seed)
torch.manual_seed(seed)
torch.cuda.manual_seed_all(seed)
transformers.set_seed(seed)
def generate_batch(model, tokenizer, batch_samples):
prompts = []
PROMPT_TEMPLATE = (
"Below is an instruction that describes a task. "
"Write a response that appropriately completes the request.\n\n"
"### Instruction:\n{instruction}\n\n### Response:"
)
for passage, question in zip(batch_samples['passage'], batch_samples['question']):
instr = f"Passage: {passage}\nQuestion: {question}"
prompts.append(PROMPT_TEMPLATE.format(instruction=instr))
# Tokenize
inputs = tokenizer(
prompts,
return_tensors="pt",
padding=True,
truncation=True,
max_length=1024
).to(model.device)
# Generate
with torch.no_grad():
outputs = model.generate(
**inputs,
max_new_tokens=20,
do_sample=False, # Greedy decoding
pad_token_id=tokenizer.pad_token_id,
repetition_penalty=1.2
)
# Truncate input
input_length = inputs.input_ids.shape[1]
generated_tokens = outputs[:, input_length:]
decoded_preds = tokenizer.batch_decode(generated_tokens, skip_special_tokens=True)
final_answers = [text.strip() for text in decoded_preds]
return final_answers
@draccus.wrap()
def main(mainCfg: MainConfig):
print('='*120)
set_seed(mainCfg.seed)
# print(draccus.dump(mainCfg, default_flow_style=False))
model = AutoModelForCausalLM.from_pretrained(mainCfg.model.model_name,device_map="auto",dtype=torch.float16)
tokenizer = AutoTokenizer.from_pretrained(mainCfg.model.model_name, padding_side='left')
if tokenizer.pad_token is None:
if tokenizer.unk_token_id is not None:
tokenizer.pad_token_id = tokenizer.unk_token_id
tokenizer.pad_token = tokenizer.unk_token
print("Set PAD token to UNK token.")
elif tokenizer.eos_token_id is not None:
tokenizer.pad_token_id = tokenizer.eos_token_id
tokenizer.pad_token = tokenizer.eos_token
print("Set PAD token to EOS token.")
if model is not None:
model.config.pad_token_id = tokenizer.pad_token_id
if model.config.pad_token_id != tokenizer.pad_token_id:
raise ValueError("Failed to sync pad_token_id between tokenizer and model config")
if mainCfg.model.adapter_path is not None:
model = PeftModel.from_pretrained(model, mainCfg.model.adapter_path+"/ft2", is_trainable = True)
model = model.merge_and_unload() # Merge for speed
model.eval()
else:
raise KeyError('wrong adapter path: ', mainCfg.model.adapter_path)
full_drop_test = load_dataset(path=mainCfg.data.path, split='validation')
test_dataset_raw = full_drop_test.select(range(mainCfg.data.total_test_samples))
results = []
total_em = 0
total_f1 = 0
print(f"Starting Inference on {len(test_dataset_raw)} samples...")
BATCH_SIZE = mainCfg.trainer_args.per_device_eval_batch_size
for i in tqdm(range(0, len(test_dataset_raw), BATCH_SIZE)):
batch_indices = range(i, min(i + BATCH_SIZE, len(test_dataset_raw)))
batch_samples = test_dataset_raw.select(batch_indices)
# generate
batch_preds = generate_batch(model, tokenizer, batch_samples)
#
for idx, pred in zip(batch_indices, batch_preds):
original_item = test_dataset_raw[int(idx)]
ground_truths = original_item['answers_spans']['spans']
# --- GRADE ---
em = metric_max_over_ground_truths(exact_match_score, pred, ground_truths)
f1 = metric_max_over_ground_truths(f1_score, pred, ground_truths)
total_em += em
total_f1 += f1
results.append({
"id": original_item["query_id"],
"prediction": pred,
"ground_truths": ground_truths,
"em": em,
"f1": f1
})
# 4. Final Statistics
avg_em = 100.0 * total_em / len(test_dataset_raw)
avg_f1 = 100.0 * total_f1 / len(test_dataset_raw)
print("\n" + "="*30)
print("RESULTS")
print("="*30)
print(f"Total Samples: {len(test_dataset_raw)}")
print(f"Exact Match (EM): {avg_em:.2f}%")
print(f"F1 Score : {avg_f1:.2f}%")
print("="*30)
# 5. Save details to JSON
output_file = mainCfg.model.adapter_path + "/drop_evaluation_results.json"
with open(output_file, "w", encoding='utf-8') as f:
json.dump({
"metrics": {"EM": avg_em, "F1": avg_f1},
"details": results # Sửa tên biến 'predictions' thành 'results'
}, f, indent=2, ensure_ascii=False)
print(f"Detailed results saved to {output_file}")
if __name__ == "__main__":
main() |