eawolf2357-git / videoxl /eval /model_vqa_mmmu.py
seawolf2357's picture
Upload folder using huggingface_hub
321d89c verified
raw
history blame
12 kB
import re
import random
import numpy as np
import os
import json
import yaml
import torch
from tqdm import tqdm
from datasets import load_dataset, concatenate_datasets
from argparse import ArgumentParser
from bunny.model.builder import load_pretrained_model
from bunny.util.mm_utils import get_model_name_from_path, tokenizer_image_token
from bunny.constants import IMAGE_TOKEN_INDEX, DEFAULT_IMAGE_TOKEN
from bunny.conversation import conv_templates
CAT_SHORT2LONG = {
'acc': 'Accounting',
'agri': 'Agriculture',
'arch': 'Architecture_and_Engineering',
'art': 'Art',
'art_theory': 'Art_Theory',
'bas_med': 'Basic_Medical_Science',
'bio': 'Biology',
'chem': 'Chemistry',
'cli_med': 'Clinical_Medicine',
'cs': 'Computer_Science',
'design': 'Design',
'diag_med': 'Diagnostics_and_Laboratory_Medicine',
'econ': 'Economics',
'elec': 'Electronics',
'ep': 'Energy_and_Power',
'fin': 'Finance',
'geo': 'Geography',
'his': 'History',
'liter': 'Literature',
'manage': 'Manage',
'mark': 'Marketing',
'mate': 'Materials',
'math': 'Math',
'mech': 'Mechanical_Engineering',
'music': 'Music',
'phar': 'Pharmacy',
'phys': 'Physics',
'psy': 'Psychology',
'pub_health': 'Public_Health',
'socio': 'Sociology'
}
# ----------- Process Multi-choice -------------
def parse_multi_choice_response(response, all_choices, index2ans):
"""
Parse the prediction from the generated response.
Return the predicted index e.g., A, B, C, D.
"""
for char in [',', '.', '!', '?', ';', ':', "'"]:
response = response.strip(char)
response = " " + response + " " # add space to avoid partial match
index_ans = True
ans_with_brack = False
candidates = []
for choice in all_choices: # e.g., (A) (B) (C) (D)
if f'({choice})' in response:
candidates.append(choice)
ans_with_brack = True
if len(candidates) == 0:
for choice in all_choices: # e.g., A B C D
if f' {choice} ' in response:
candidates.append(choice)
# if all above doesn't get candidates, check if the content is larger than 5 tokens and try to parse the example
if len(candidates) == 0 and len(response.split()) > 5:
for index, ans in index2ans.items():
if ans.lower() in response.lower():
candidates.append(index)
index_ans = False # it's content ans.
if len(candidates) == 0: # still not get answer, randomly choose one.
pred_index = random.choice(all_choices)
elif len(candidates) > 1:
start_indexes = []
if index_ans:
if ans_with_brack:
for can in candidates:
index = response.rfind(f'({can})')
start_indexes.append(index) # -1 will be ignored anyway
# start_indexes = [generated_response.index(f'({can})') for can in candidates]
else:
for can in candidates:
index = response.rfind(f" {can} ")
start_indexes.append(index)
else:
for can in candidates:
index = response.lower().rfind(index2ans[can].lower())
start_indexes.append(index)
# get the last one
pred_index = candidates[np.argmax(start_indexes)]
else: # if only one candidate, use it.
pred_index = candidates[0]
return pred_index
def call_bunny_engine_df(args, sample, model, tokenizer=None, processor=None):
def deal_with_prompt(input_text):
qs = input_text
qs = DEFAULT_IMAGE_TOKEN + '\n' + qs
return qs
prompt = sample['final_input_prompt']
prompt = deal_with_prompt(prompt)
conv = conv_templates[args.conv_mode].copy()
conv.append_message(conv.roles[0], prompt)
conv.append_message(conv.roles[1], None)
prompt = conv.get_prompt()
input_ids = tokenizer_image_token(prompt, tokenizer, IMAGE_TOKEN_INDEX, return_tensors='pt').unsqueeze(0).cuda()
image = sample['image']
if image is not None:
output_ids = model.generate(
input_ids,
images=image.unsqueeze(0).to(dtype=model.dtype, device='cuda', non_blocking=True),
do_sample=False,
temperature=0,
top_p=None,
# num_beams=5,
max_new_tokens=128,
use_cache=True)
input_token_len = input_ids.shape[1]
# n_diff_input_output = (input_ids != output_ids[:, :input_token_len]).sum().item()
# if n_diff_input_output > 0:
# print(f'[Warning] {n_diff_input_output} output_ids are not the same as the input_ids')
response = tokenizer.batch_decode(output_ids[:, input_token_len:], skip_special_tokens=True)[0]
else: # multiple images actually
if sample['question_type'] == 'multiple-choice':
all_choices = sample['all_choices']
response = random.choice(all_choices)
else:
response = 'INVALID GENERATION FOR MULTIPLE IMAGE INPUTS'
return response
def load_yaml(file_path):
with open(file_path, 'r') as stream:
try:
yaml_dict = yaml.safe_load(stream)
except yaml.YAMLError as exc:
print(exc)
return yaml_dict
def parse_img_path(text):
matches = re.findall("<img='(.*?)'>", text)
return matches
def process_single_sample(data):
question = data['question']
o_imgs_paths = []
for option in data['options']:
current_o_imgs_paths = parse_img_path(option)
for img_path in current_o_imgs_paths:
o_imgs_paths.append(img_path)
if len(o_imgs_paths) > 1: # multiple images in options, used for random selection
return {'id': data['id'], 'question': question, 'options': data['options'], 'answer': data['answer'],
'image': None, 'question_type': data['question_type']}
else:
return {'id': data['id'], 'question': question, 'options': data['options'], 'answer': data['answer'],
'image': data['image_1'], 'question_type': data['question_type']}
# DATA PROCESSING
def construct_prompt(sample, config):
question = sample['question']
options = eval(sample['options'])
example = ""
if sample['question_type'] == 'multiple-choice':
start_chr = 'A'
prediction_range = []
index2ans = {}
for option in options:
prediction_range.append(start_chr)
example += f"({start_chr}) {option}\n"
index2ans[start_chr] = option
start_chr = chr(ord(start_chr) + 1)
empty_prompt_sample_structure = config['multi_choice_example_format']
empty_prompt = empty_prompt_sample_structure.format(question, example)
res_dict = {}
res_dict['index2ans'] = index2ans
res_dict['correct_choice'] = sample['answer']
res_dict['all_choices'] = prediction_range
res_dict['empty_prompt'] = empty_prompt
if config['task_instructions']:
res_dict['final_input_prompt'] = config['task_instructions'].strip() + '\n\n' + empty_prompt
else:
res_dict['final_input_prompt'] = empty_prompt
res_dict['gt_content'] = options[ord(sample['answer'].upper()) - ord('A')]
else:
empty_prompt_sample_structure = config['short_ans_example_format']
empty_prompt = empty_prompt_sample_structure.format(question)
res_dict = {}
res_dict['empty_prompt'] = empty_prompt
if config['task_instructions']:
res_dict['final_input_prompt'] = config['task_instructions'].strip() + '\n\n' + empty_prompt
else:
res_dict['final_input_prompt'] = empty_prompt
res_dict['gt_content'] = sample['answer']
res_dict.update(sample)
return res_dict
def run_model(args, samples, model, call_model_engine_fn=None, tokenizer=None, processor=None):
out_samples = dict()
with torch.no_grad():
for sample in tqdm(samples):
if args.small_gpu_usage:
sample['image'] = sample['image'].cuda()
response = call_model_engine_fn(args, sample, model, tokenizer, processor)
if args.small_gpu_usage:
sample['image'] = sample['image'].cpu()
if sample['question_type'] == 'multiple-choice':
pred_ans = parse_multi_choice_response(response, sample['all_choices'], sample['index2ans'])
else: # open question
pred_ans = response
out_samples[sample['id']] = pred_ans
return out_samples
def set_seed(seed_value):
"""
Set the seed for PyTorch (both CPU and CUDA), Python, and NumPy for reproducible results.
:param seed_value: An integer value to be used as the seed.
"""
torch.manual_seed(seed_value)
if torch.cuda.is_available():
torch.cuda.manual_seed(seed_value)
torch.cuda.manual_seed_all(seed_value) # For multi-GPU setups
random.seed(seed_value)
np.random.seed(seed_value)
torch.backends.cudnn.deterministic = True
torch.backends.cudnn.benchmark = False
def main():
parser = ArgumentParser()
parser.add_argument('--model-path', type=str, default=None)
parser.add_argument('--model-base', type=str, default=None)
parser.add_argument("--model-type", type=str, default=None)
parser.add_argument("--conv-mode", type=str, default=None)
parser.add_argument('--data-path', type=str, default=None)
parser.add_argument('--config-path', type=str, default=None)
parser.add_argument('--output-path', type=str, default=None)
parser.add_argument('--split', type=str, default='validation')
parser.add_argument('--seed', type=int, default=42)
parser.add_argument("--small-gpu-usage", action="store_true")
args = parser.parse_args()
device = torch.device("cuda") if torch.cuda.is_available() else "cpu"
set_seed(args.seed)
print('bunny_initializing...')
processor = None
call_model_engine = call_bunny_engine_df
# load config and process to one value
args.config = load_yaml(args.config_path)
for key, value in args.config.items():
if key != 'eval_params' and type(value) == list:
assert len(value) == 1, 'key {} has more than one value'.format(key)
args.config[key] = value[0]
# run for each subject
sub_dataset_list = []
for subject in CAT_SHORT2LONG.values():
sub_dataset = load_dataset(args.data_path, subject, split=args.split)
sub_dataset_list.append(sub_dataset)
# merge all dataset
dataset = concatenate_datasets(sub_dataset_list)
# load model
model_path = os.path.expanduser(args.model_path)
model_name = get_model_name_from_path(model_path)
tokenizer, model, vis_processors, context_len = load_pretrained_model(model_path, args.model_base, model_name,
args.model_type)
samples = []
print('Processing MMMU dataset...')
for sample in tqdm(dataset):
sample = process_single_sample(sample)
sample = construct_prompt(sample, args.config)
if sample['image']:
if args.small_gpu_usage:
sample['image'] = vis_processors.preprocess(sample['image'].convert('RGB'), return_tensors='pt')['pixel_values'][0]
else:
sample['image'] = vis_processors.preprocess(sample['image'].convert('RGB'), return_tensors='pt')['pixel_values'][0].to(device)
samples.append(sample)
print('Start to evaluate...')
# run ex
out_samples = run_model(args, samples, model, call_model_engine, tokenizer, processor)
os.makedirs(os.path.dirname(args.output_path), exist_ok=True)
with open(args.output_path, 'w') as f:
json.dump(out_samples, f, indent=4)
if __name__ == '__main__':
main()