| |
|
|
| import torch |
| from transformers import AutoTokenizer, AutoModelForCausalLM |
|
|
| from datasets import load_dataset |
| from tqdm import tqdm |
| import re |
| import string |
| import collections |
| import numpy as np |
| import json |
|
|
| from .config import MainConfig, convert_to_trainer_args |
| from smpeft.sama import SamaConfig |
| from smpeft import get_peft_model, PeftModel |
| import draccus |
| import random |
| import transformers |
|
|
| BATCH_SIZE = 32 |
| IGNORE_INDEX=-100 |
| MAX_NEW_TOKENS = 50 |
| PROMPT_TEMPLATE = ( |
| "Below is an passage followed by a coresponding question that describes a task " |
| "Write a response that appropriately completes the request with your answer.\n\n" |
| "### Instruction:\n{instruction}\n\n### Response:" |
| ) |
|
|
| def normalize_answer(s): |
| """Lower text and remove punctuation, articles and extra whitespace.""" |
| def remove_articles(text): |
| regex = re.compile(r'\b(a|an|the)\b', re.UNICODE) |
| return re.sub(regex, ' ', text) |
| def white_space_fix(text): |
| return ' '.join(text.split()) |
| def remove_punc(text): |
| exclude = set(string.punctuation) |
| return ''.join(ch for ch in text if ch not in exclude) |
| def lower(text): |
| return text.lower() |
| return white_space_fix(remove_articles(remove_punc(lower(s)))) |
|
|
|
|
| def f1_score(prediction, ground_truth): |
| prediction_tokens = normalize_answer(prediction).split() |
| ground_truth_tokens = normalize_answer(ground_truth).split() |
| common = collections.Counter(prediction_tokens) & collections.Counter(ground_truth_tokens) |
| num_same = sum(common.values()) |
| if num_same == 0: |
| return 0 |
| precision = 1.0 * num_same / len(prediction_tokens) |
| recall = 1.0 * num_same / len(ground_truth_tokens) |
| f1 = (2 * precision * recall) / (precision + recall) |
| return f1 |
|
|
| def exact_match_score(prediction, ground_truth): |
| return (normalize_answer(prediction) == normalize_answer(ground_truth)) |
|
|
| def metric_max_over_ground_truths(metric_fn, prediction, ground_truths): |
| """ |
| DROP often has multiple valid answer spans. |
| We take the max score among all valid ground truths. |
| """ |
| scores_for_ground_truths = [] |
| for ground_truth in ground_truths: |
| score = metric_fn(prediction, ground_truth) |
| scores_for_ground_truths.append(score) |
| return max(scores_for_ground_truths) |
|
|
| def set_seed(seed: int): |
| random.seed(seed) |
| np.random.seed(seed) |
| torch.manual_seed(seed) |
| torch.cuda.manual_seed_all(seed) |
| transformers.set_seed(seed) |
|
|
| def generate_batch(model, tokenizer, batch_samples): |
| prompts = [] |
| PROMPT_TEMPLATE = ( |
| "Below is an instruction that describes a task. " |
| "Write a response that appropriately completes the request.\n\n" |
| "### Instruction:\n{instruction}\n\n### Response:" |
| ) |
| |
| for passage, question in zip(batch_samples['passage'], batch_samples['question']): |
| instr = f"Passage: {passage}\nQuestion: {question}" |
| prompts.append(PROMPT_TEMPLATE.format(instruction=instr)) |
|
|
| |
| inputs = tokenizer( |
| prompts, |
| return_tensors="pt", |
| padding=True, |
| truncation=True, |
| max_length=1024 |
| ).to(model.device) |
|
|
| |
| with torch.no_grad(): |
| outputs = model.generate( |
| **inputs, |
| max_new_tokens=20, |
| do_sample=False, |
| pad_token_id=tokenizer.pad_token_id, |
| repetition_penalty=1.2 |
| ) |
| |
| input_length = inputs.input_ids.shape[1] |
| generated_tokens = outputs[:, input_length:] |
| |
| decoded_preds = tokenizer.batch_decode(generated_tokens, skip_special_tokens=True) |
| |
| final_answers = [text.strip() for text in decoded_preds] |
| |
| return final_answers |
|
|
| @draccus.wrap() |
| def main(mainCfg: MainConfig): |
| print('='*120) |
| set_seed(mainCfg.seed) |
| |
|
|
| model = AutoModelForCausalLM.from_pretrained(mainCfg.model.model_name,device_map="auto",dtype=torch.float16) |
| tokenizer = AutoTokenizer.from_pretrained(mainCfg.model.model_name, padding_side='left') |
| |
| if tokenizer.pad_token is None: |
| if tokenizer.unk_token_id is not None: |
| tokenizer.pad_token_id = tokenizer.unk_token_id |
| tokenizer.pad_token = tokenizer.unk_token |
| print("Set PAD token to UNK token.") |
| elif tokenizer.eos_token_id is not None: |
| tokenizer.pad_token_id = tokenizer.eos_token_id |
| tokenizer.pad_token = tokenizer.eos_token |
| print("Set PAD token to EOS token.") |
|
|
| if model is not None: |
| model.config.pad_token_id = tokenizer.pad_token_id |
| if model.config.pad_token_id != tokenizer.pad_token_id: |
| raise ValueError("Failed to sync pad_token_id between tokenizer and model config") |
| |
| if mainCfg.model.adapter_path is not None: |
| model = PeftModel.from_pretrained(model, mainCfg.model.adapter_path+"/ft2", is_trainable = True) |
| model = model.merge_and_unload() |
| model.eval() |
| else: |
| raise KeyError('wrong adapter path: ', mainCfg.model.adapter_path) |
| |
| full_drop_test = load_dataset(path=mainCfg.data.path, split='validation') |
| test_dataset_raw = full_drop_test.select(range(mainCfg.data.total_test_samples)) |
| |
| results = [] |
| total_em = 0 |
| total_f1 = 0 |
| |
| print(f"Starting Inference on {len(test_dataset_raw)} samples...") |
|
|
| BATCH_SIZE = mainCfg.trainer_args.per_device_eval_batch_size |
| for i in tqdm(range(0, len(test_dataset_raw), BATCH_SIZE)): |
| batch_indices = range(i, min(i + BATCH_SIZE, len(test_dataset_raw))) |
| batch_samples = test_dataset_raw.select(batch_indices) |
| |
| |
| batch_preds = generate_batch(model, tokenizer, batch_samples) |
| |
| |
| for idx, pred in zip(batch_indices, batch_preds): |
| original_item = test_dataset_raw[int(idx)] |
| ground_truths = original_item['answers_spans']['spans'] |
| |
| |
| em = metric_max_over_ground_truths(exact_match_score, pred, ground_truths) |
| f1 = metric_max_over_ground_truths(f1_score, pred, ground_truths) |
| |
| total_em += em |
| total_f1 += f1 |
| |
| results.append({ |
| "id": original_item["query_id"], |
| "prediction": pred, |
| "ground_truths": ground_truths, |
| "em": em, |
| "f1": f1 |
| }) |
| |
| |
| avg_em = 100.0 * total_em / len(test_dataset_raw) |
| avg_f1 = 100.0 * total_f1 / len(test_dataset_raw) |
|
|
| print("\n" + "="*30) |
| print("RESULTS") |
| print("="*30) |
| print(f"Total Samples: {len(test_dataset_raw)}") |
| print(f"Exact Match (EM): {avg_em:.2f}%") |
| print(f"F1 Score : {avg_f1:.2f}%") |
| print("="*30) |
|
|
| |
| output_file = mainCfg.model.adapter_path + "/drop_evaluation_results.json" |
| with open(output_file, "w", encoding='utf-8') as f: |
| json.dump({ |
| "metrics": {"EM": avg_em, "F1": avg_f1}, |
| "details": results |
| }, f, indent=2, ensure_ascii=False) |
| print(f"Detailed results saved to {output_file}") |
| |
| if __name__ == "__main__": |
| main() |