| import os |
| import pandas as pd |
| import numpy as np |
| import argparse |
| import datasets |
| import torch |
| import re |
| from thefuzz import process |
| from typing import List |
| from tqdm import tqdm |
| from transformers.trainer_utils import set_seed |
|
|
| from typing import Tuple, List, Union, Iterable |
|
|
| import numpy as np |
| import torch |
| import torch.nn.functional as F |
| from transformers import PreTrainedTokenizer |
| from transformers import logging |
| from transformers.generation import LogitsProcessor |
| from typing import TYPE_CHECKING, Optional, Tuple, Union, Callable, List |
| HistoryType = List[Tuple[str, str]] |
| TokensType = List[int] |
| BatchTokensType = List[List[int]] |
|
|
| def make_context( |
| tokenizer: PreTrainedTokenizer, |
| query: str, |
| history: List[Tuple[str, str]] = None, |
| system: str = "", |
| max_window_size: int = 6144, |
| chat_format: str = "chatml", |
| ): |
| if history is None: |
| history = [] |
|
|
| im_start, im_end = "<|im_start|>", "<|im_end|>" |
| im_start_tokens = [tokenizer.im_start_id] |
| im_end_tokens = [tokenizer.im_end_id] |
| nl_tokens = tokenizer.encode("\n") |
|
|
| def _tokenize_str(role, content): |
| return f"{role}\n{content}", tokenizer.encode( |
| role |
| ) + nl_tokens + tokenizer.encode(content) |
|
|
| system_text, system_tokens_part = _tokenize_str("system", system) |
| system_tokens = im_start_tokens + system_tokens_part + im_end_tokens |
|
|
| raw_text = "" |
| context_tokens = [] |
|
|
| for turn_query, turn_response in reversed(history): |
| query_text, query_tokens_part = _tokenize_str("user", turn_query) |
| query_tokens = im_start_tokens + query_tokens_part + im_end_tokens |
| response_text, response_tokens_part = _tokenize_str( |
| "assistant", turn_response |
| ) |
| response_tokens = im_start_tokens + response_tokens_part + im_end_tokens |
|
|
| next_context_tokens = nl_tokens + query_tokens + nl_tokens + response_tokens |
| prev_chat = ( |
| f"\n{im_start}{query_text}{im_end}\n{im_start}{response_text}{im_end}" |
| ) |
|
|
| current_context_size = ( |
| len(system_tokens) + len(next_context_tokens) + len(context_tokens) |
| ) |
| if current_context_size < max_window_size: |
| context_tokens = next_context_tokens + context_tokens |
| raw_text = prev_chat + raw_text |
| else: |
| break |
|
|
| context_tokens = system_tokens + context_tokens |
| raw_text = f"{im_start}{system_text}{im_end}" + raw_text |
| context_tokens += ( |
| nl_tokens |
| + im_start_tokens |
| + _tokenize_str("user", query)[1] |
| + im_end_tokens |
| + nl_tokens |
| + im_start_tokens |
| + tokenizer.encode("assistant") |
| + nl_tokens |
| ) |
| raw_text += f"\n{im_start}user\n{query}{im_end}\n{im_start}assistant\n" |
|
|
| return raw_text, context_tokens |
|
|
| def chat( |
| model, |
| tokenizer: PreTrainedTokenizer, |
| query: str, |
| history: Optional[HistoryType], |
| system: str = "You are a helpful assistant.", |
| append_history: bool = True |
| ) -> Tuple[str, HistoryType]: |
|
|
|
|
| if history is None: |
| history = [] |
|
|
| raw_text, context_tokens = make_context( |
| tokenizer, |
| query, |
| history=history, |
| system=system, |
| max_window_size=6144, |
| chat_format = "chatml", |
| ) |
|
|
| stop_words_ids = [[tokenizer.im_end_id], [tokenizer.im_start_id]] |
| input_ids = torch.tensor([context_tokens]).cuda() |
| outputs = model.generate( |
| input_ids, |
| |
| return_dict_in_generate = False, |
| ) |
|
|
| response = decode_tokens( |
| outputs[0], |
| tokenizer, |
| raw_text_len=len(raw_text), |
| context_length=len(context_tokens), |
| chat_format='chatml', |
| verbose=False, |
| ) |
|
|
| if append_history: |
| history.append((query, response)) |
|
|
| return response, history |
|
|
| def decode_tokens( |
| tokens: Union[torch.LongTensor, TokensType], |
| tokenizer: PreTrainedTokenizer, |
| raw_text_len: int, |
| context_length: int, |
| chat_format: str = "chatml", |
| verbose: bool = False, |
| return_end_reason: bool = False, |
| ) -> str: |
| if torch.is_tensor(tokens): |
| tokens = tokens.cpu().numpy().tolist() |
|
|
|
|
| return _decode_chatml( |
| tokens, |
| stop_words=[], |
| eod_token_ids=[tokenizer.im_start_id, tokenizer.im_end_id], |
| tokenizer=tokenizer, |
| raw_text_len=raw_text_len, |
| context_length=context_length, |
| verbose=verbose, |
| return_end_reason=return_end_reason, |
| ) |
|
|
|
|
| def _decode_chatml( |
| tokens: List[int], |
| *, |
| stop_words: List[str], |
| eod_token_ids: List[int], |
| tokenizer: PreTrainedTokenizer, |
| raw_text_len: int, |
| context_length: int, |
| verbose: bool = False, |
| return_end_reason: bool = False, |
| chat_format = "chatml", |
| ): |
| end_reason = f"Gen length {len(tokens)}" |
| eod_token_idx = context_length |
| for eod_token_idx in range(context_length, len(tokens)): |
| if tokens[eod_token_idx] in eod_token_ids: |
| end_reason = f"Gen {tokenizer.decode([tokens[eod_token_idx]])!r}" |
| break |
|
|
| trim_decode_tokens = tokenizer.decode(tokens[:eod_token_idx])[raw_text_len:] |
| if verbose: |
| print("\nRaw Generate w/o EOD:", tokenizer.decode(tokens)[raw_text_len:]) |
| print("\nRaw Generate:", trim_decode_tokens) |
| print("\nEnd Reason:", end_reason) |
| for stop_word in stop_words: |
| trim_decode_tokens = trim_decode_tokens.replace(stop_word, "").strip() |
| trim_decode_tokens = trim_decode_tokens.strip() |
| if verbose: |
| print("\nGenerate:", trim_decode_tokens) |
|
|
| if return_end_reason: |
| return trim_decode_tokens, end_reason |
| else: |
| return trim_decode_tokens |
|
|
|
|
|
|
| def load_models_tokenizer(args): |
| from transformers import AutoModelForCausalLM, AutoTokenizer |
| from transformers.generation import GenerationConfig |
|
|
| tokenizer = AutoTokenizer.from_pretrained(args.checkpoint_path, trust_remote_code=True) |
| model = AutoModelForCausalLM.from_pretrained(args.checkpoint_path, device_map="auto", trust_remote_code=True).eval() |
| model.generation_config = GenerationConfig.from_pretrained(args.checkpoint_path, trust_remote_code=True) |
| model.generation_config.do_sample = False |
| return model, tokenizer |
|
|
|
|
| def format_example(line): |
| example = 'The following is a multiple-choice question. Please choose the most suitable one among A, B, C and D as the answer to this question.\n\n' + line['question'] + "\n" |
| for choice in choices: |
| example += f'{choice}. {line[f"{choice}"]}\n' |
| return example |
|
|
|
|
| def process_before_extraction(gen, choice_dict): |
| |
| |
| for key, val in sorted(choice_dict.items(), key=lambda x: len(x[1]), reverse=True): |
| pattern = re.compile(re.escape(val.rstrip(".")), re.IGNORECASE) |
| gen = pattern.sub(key, gen) |
| return gen |
|
|
| def extract_choice(gen, choice_list): |
| |
| res = re.search(r"(?:(?:[Cc]hoose)|(?:(?:[Aa]nswer|[Cc]hoice)(?![^ABCD]{0,20}?(?:n't|not))[^ABCD]{0,10}?\b(?:|is|:|be))\b)[^ABCD]{0,20}?\b(A|B|C|D)\b", gen) |
|
|
| |
| if res is None: |
| res = re.search(r"\b(A|B|C|D)\b(?![^ABCD]{0,8}?(?:n't|not)[^ABCD]{0,5}?(?:correct|right))[^ABCD]{0,10}?\b(?:correct|right)\b", gen) |
|
|
| |
| if res is None: |
| res = re.search(r"^(A|B|C|D)(?:\.|,|:|$)", gen) |
|
|
| |
| if res is None: |
| res = re.search(r"(?<![a-zA-Z])(A|B|C|D)(?![a-zA-Z=])", gen) |
|
|
| if res is None: |
| return choices[choice_list.index(process.extractOne(gen, choice_list)[0])] |
| else: |
| return res.group(1) |
|
|
| def extract_answer(response, row): |
| gen = process_before_extraction(response, {choice: row[choice] for choice in choices}) |
| pred = extract_choice(gen, [row[choice] for choice in choices]) |
| return pred |
|
|
| @torch.no_grad() |
| def eval_subject( |
| model, |
| tokenizer, |
| subject_name, |
| test_df, |
| save_result_dir=None, |
| overwrite=False, |
| **kwargs |
| ): |
| result_path = os.path.join(save_result_dir, f'{subject_name}_result.csv') |
| if not overwrite and os.path.exists(result_path): |
| print(f"{result_path} existed, skip!") |
| score = [] |
| for (_, datarow), (_, resultrow) in zip(test_df.iterrows(), pd.read_csv(result_path).iterrows()): |
| |
| pred = resultrow['model_output'] |
| correct = 1 if pred == datarow['answer'] else 0 |
| score.append(correct) |
| return score |
|
|
| result = [] |
| score = [] |
|
|
| for _, row in tqdm(test_df.iterrows(), total=len(test_df)): |
| question = format_example(row) |
|
|
| response, history = chat( |
| model, |
| tokenizer, |
| question, |
| history=None, |
| ) |
| print(question) |
| print(response) |
| pred = extract_answer(response, row) |
| print(pred) |
| print("======================") |
|
|
| if 'answer' in row: |
| correct = 1 if pred == row['answer'] else 0 |
| score.append(correct) |
| if args.debug: print(f'{question} pred: {pred} ref: {row["answer"]}') |
| result.append(pred) |
|
|
| if save_result_dir: |
| test_df['model_output'] = result |
| test_df['model_response'] = response |
| if score: |
| test_df["correctness"] = score |
| os.makedirs(save_result_dir, exist_ok=True) |
| test_df.to_csv(os.path.join( |
| save_result_dir, f'{subject_name}_result.csv'), encoding="utf-8", index=False) |
|
|
| return score |
|
|
|
|
| def cal_mmlu(res): |
| acc_sum_dict = dict() |
| acc_norm_sum_dict = dict() |
| cnt_dict = dict() |
| acc_sum = 0. |
| cnt = 0 |
| hard_cnt = 0 |
| hard_acc_sum = 0. |
|
|
| for class_ in TASK_NAME_MAPPING.keys(): |
| acc_sum_dict[class_] = 0. |
| acc_norm_sum_dict[class_] = 0. |
| cnt_dict[class_] = 0. |
|
|
| for tt in TASK_NAME_MAPPING[class_]: |
| acc_sum += sum(res[tt]) |
| cnt += len(res[tt]) |
|
|
| acc_sum_dict[class_] += sum(res[tt]) |
| cnt_dict[class_] += len(res[tt]) |
|
|
| print('\n\n\n') |
| for k in TASK_NAME_MAPPING.keys(): |
| if k in cnt_dict: |
| print('%s ACC: %.2f ' % ( |
| k, acc_sum_dict[k] * 100 / cnt_dict[k])) |
| print('AVERAGE ACC:%.2f ' % (acc_sum *100 / cnt)) |
| |
|
|
| def main(args): |
| print("loading model weights") |
| if args.checkpoint_path is not None: |
| model, tokenizer = load_models_tokenizer(args) |
| else: |
| model, tokenizer = None, None |
| print("model loaded") |
|
|
| dev_result = {} |
| for subject_name in tqdm(SUBJECTS): |
| |
| |
| test_file_path = os.path.join(args.eval_data_path, 'test', f'{subject_name}_test.csv') |
| |
| |
| test_df = pd.read_csv(test_file_path, names=['question','A','B','C','D','answer']) |
|
|
| score = eval_subject(model, tokenizer, subject_name, test_df, save_result_dir=f"outs_chat/mmlu_eval_result", overwrite=args.overwrite) |
| dev_result[subject_name] = score |
| cal_mmlu(dev_result) |
|
|
|
|
| TASK_NAME_MAPPING = {'stem': ['abstract_algebra', 'anatomy', 'astronomy', 'college_biology', 'college_chemistry', 'college_computer_science', 'college_mathematics', 'college_physics', 'computer_security', 'conceptual_physics', 'electrical_engineering', 'elementary_mathematics', 'high_school_biology', 'high_school_chemistry', 'high_school_computer_science', 'high_school_mathematics', 'high_school_physics', 'high_school_statistics', 'machine_learning'], |
| 'Humanities': ['formal_logic', 'high_school_european_history', 'high_school_us_history', 'high_school_world_history', 'international_law', 'jurisprudence', 'logical_fallacies', 'moral_disputes', 'moral_scenarios', 'philosophy', 'prehistory', 'professional_law', 'world_religions'], |
| 'other': ['business_ethics', 'college_medicine', 'human_aging', 'management', 'marketing', 'medical_genetics', 'miscellaneous', 'nutrition', 'professional_accounting', 'professional_medicine', 'virology', 'global_facts', 'clinical_knowledge'], |
| 'social': ['econometrics', 'high_school_geography', 'high_school_government_and_politics', 'high_school_macroeconomics', 'high_school_microeconomics', 'high_school_psychology', 'human_sexuality', 'professional_psychology', 'public_relations', 'security_studies', 'sociology', 'us_foreign_policy']} |
| SUBJECTS = [v for vl in TASK_NAME_MAPPING.values() for v in vl] |
| choices = ["A", "B", "C", "D"] |
|
|
| if __name__ == '__main__': |
| parser = argparse.ArgumentParser(description='Test HF checkpoint.') |
| parser.add_argument('-c', '--checkpoint-path', type=str, help='Checkpoint path', default="Qwen/Qwen-7B-Chat") |
| parser.add_argument('-s', '--seed', type=int, default=1234, help='Random seed') |
|
|
| """Provide extra arguments required for tasks.""" |
| group = parser.add_argument_group(title='Evaluation options') |
| group.add_argument('-d', '--eval_data_path', type=str, |
| help='Path to eval data') |
| group.add_argument("--debug", action='store_true', default=False, |
| help='Print infos.') |
| group.add_argument("--overwrite", action='store_true', default=False, |
| help='Overwrite existed results') |
|
|
| args = parser.parse_args() |
| set_seed(args.seed) |
|
|
| main(args) |