File size: 7,921 Bytes
7ed0fb5 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 |
from transformers import Qwen2VLForConditionalGeneration, AutoTokenizer, AutoProcessor
from qwen_vl_utils import process_vision_info
import torch
import json
import tqdm
from math_verify import parse, verify
import argparse
import pandas as pd
from torch.multiprocessing import Process, set_start_method, Manager
from transformers.utils.logging import disable_progress_bar
disable_progress_bar()
# >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
# >>>>> 1. get evaluation configuration <<<<<
# >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
def get_eval_config():
parser = argparse.ArgumentParser(description="Inference script for GeoQA evaluation.")
parser.add_argument("--model_path", required=True, type=str, help="Path to the model checkpoint (e.g., qwen2vl model or a fine-tuned model).")
parser.add_argument("--batch_size", default=4, type=int, help="Batch size for inference. Reduce if GPU OOM (default: 50).")
parser.add_argument("--output_path", required=True, type=str, help="Path to save inference result (e.g., JSON file).")
parser.add_argument("--prompt_path", required=True, type=str, help="Path to the prompts JSONL file for GeoQA evaluation.")
all_gpu = ",".join(map(str, range(torch.cuda.device_count())))
parser.add_argument("--gpu_ids", default=all_gpu, help="comma-separated list of GPU IDs to use")
args = parser.parse_args()
return args
# >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
# >>>>>>>>>> 2. load testset <<<<<<<<<<<<<
# >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
def prepare_test_messages(testset_path):
testset_data = pd.read_json(testset_path, lines=True).to_dict(orient="records")
QUESTION_TEMPLATE = "{Question} Output the thinking process in <think> </think> and final answer (number) in <answer> </answer> tags."
tested_messages = []
for i in testset_data:
message = [{
"role": "user",
"content": [
{
"type": "image",
"image": f"file://{i['image_path']}"
},
{
"type": "text",
"text": QUESTION_TEMPLATE.format(Question=i['question'])
}
]
}]
tested_messages.append(message)
return testset_data, tested_messages
# >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
# >>>>> 3. use several GPUs to accelerate inference at testset <<<<<
# >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
def init_model(model_path, gpu_id):
"""init a model(args.model_path) on a specific gpu"""
# We recommend enabling flash_attention_2 for better acceleration and memory saving, especially in multi-image and video scenarios.
model = Qwen2VLForConditionalGeneration.from_pretrained(
model_path,
torch_dtype=torch.bfloat16,
attn_implementation="flash_attention_2",
device_map=f"cuda:{gpu_id}",
)
# default processer
processor = AutoProcessor.from_pretrained(model_path, use_fast=True)
return model, processor
def answer_a_batch_question_qwen(batch_messages, model, processor):
""" let qwen answer a batch of questions """
text = [processor.apply_chat_template(msg, tokenize=False, add_generation_prompt=True) for msg in batch_messages]
image_inputs, video_inputs = process_vision_info(batch_messages)
inputs = processor(
text=text,
images=image_inputs,
videos=video_inputs,
padding=True,
return_tensors="pt",
)
inputs = inputs.to(model.device)
generated_ids = model.generate(**inputs, use_cache=True, max_new_tokens=1024) # do_sample=False
generated_ids_trimmed = [
out_ids[len(in_ids):] for in_ids, out_ids in zip(inputs.input_ids, generated_ids)
]
batch_output_text = processor.batch_decode(
generated_ids_trimmed, skip_special_tokens=True, clean_up_tokenization_spaces=False
)
return batch_output_text
def infer_on_single_gpu(model_path, device_id, chunk_of_tested_messages, batch_size, results=None):
"""init model on this single gpu and let it answer asign chunk of questions"""
model, processor = init_model(model_path, device_id)
### split batch
responses = []
batch_messages_list = [chunk_of_tested_messages[start: start + batch_size]
for start in range(0, len(chunk_of_tested_messages), batch_size)]
for batch_messages in tqdm.auto.tqdm(batch_messages_list, desc=f"GPU {device_id} progress", position=device_id, leave=False):
batch_output_text = answer_a_batch_question_qwen(batch_messages, model, processor)
responses.extend(batch_output_text)
results[device_id] = responses
return
def multi_gpu_inference(prompts, gpu_ids, model_path, batch_size):
""" let each gpu (along with a model) answer a chunk of questions """
set_start_method("spawn", force=True)
manager = Manager()
gpu_id2result = manager.dict()
gpu_ids = [int(gpu_id.strip()) for gpu_id in gpu_ids.split(',')]
num_gpus = len(gpu_ids)
chunk_size = len(prompts) // num_gpus
processes = []
for i, gpu_id in enumerate(gpu_ids):
start_idx = i * chunk_size
end_idx = (i + 1) * chunk_size if i != num_gpus - 1 else len(prompts)
chunk = prompts[start_idx: end_idx]
process = Process(target=infer_on_single_gpu, args=(model_path, gpu_id, chunk, batch_size, gpu_id2result))
process.start()
processes.append(process)
# for process in tqdm.auto.tqdm(processes, desc="Inference progress", position=num_gpus, leave=True):
for process in processes:
process.join()
all_predicts = []
for gpu_id in gpu_ids:
all_predicts.extend(gpu_id2result[gpu_id])
return all_predicts
# >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
# >>>>>>>>>> 4. compute metrics <<<<<<<<<<<
# >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
def compute_metrics(testset_data, all_predicts):
final_output = []
correct_number = 0
for input_example, model_output in zip(testset_data, all_predicts):
original_output = model_output
ground_truth = input_example['ground_truth']
model_answer = parse(original_output)
# Count correct answers
if model_answer is not None and float(verify(model_answer,parse(ground_truth)))>0:
correct_number += 1
is_correct = True
else:
is_correct = False
try:
result = {
'question': input_example,
'ground_truth': ground_truth,
'model_output': original_output,
'extracted_answer':str(model_answer[0]) if model_answer is not None else None,
'is_correct':is_correct
}
except Exception as e:
print("no answer parsed",e,model_answer)
result = {
'question': input_example,
'ground_truth': ground_truth,
'model_output': original_output,
'extracted_answer':None,
'is_correct':is_correct
}
final_output.append(result)
# Calculate and print accuracy
accuracy = correct_number / len(tested_messages) * 100
print(f"\nAccuracy: {accuracy:.2f}%")
# Save results to a JSON file
with open(args.output_path, "w") as f:
json.dump({
'accuracy': accuracy,
'results': final_output
}, f, indent=2, ensure_ascii=False)
print(f"Results saved to {args.output_path}")
if __name__ == "__main__":
args = get_eval_config()
testset_data, tested_messages = prepare_test_messages(testset_path=args.prompt_path)
all_predicts = multi_gpu_inference(tested_messages, args.gpu_ids, args.model_path, args.batch_size)
compute_metrics(testset_data, all_predicts)
|