|
|
from transformers import Qwen2VLForConditionalGeneration, AutoTokenizer, AutoProcessor |
|
|
from qwen_vl_utils import process_vision_info |
|
|
import torch |
|
|
import json |
|
|
from tqdm import tqdm |
|
|
import re |
|
|
from math_verify import parse, verify |
|
|
|
|
|
|
|
|
MODEL_PATH="<MODEL_PATH>" |
|
|
BSZ=50 |
|
|
OUTPUT_PATH="<OUTPUT_LOG>" |
|
|
PROMPT_PATH="./prompts/geoqa_test_prompts.jsonl" |
|
|
|
|
|
|
|
|
model = Qwen2VLForConditionalGeneration.from_pretrained( |
|
|
MODEL_PATH, |
|
|
torch_dtype=torch.bfloat16, |
|
|
attn_implementation="flash_attention_2", |
|
|
device_map="auto", |
|
|
) |
|
|
|
|
|
|
|
|
processor = AutoProcessor.from_pretrained(MODEL_PATH) |
|
|
|
|
|
data = [] |
|
|
with open(PROMPT_PATH, "r") as f: |
|
|
for line in f: |
|
|
data.append(json.loads(line)) |
|
|
|
|
|
|
|
|
QUESTION_TEMPLATE = "{Question} Output the thinking process in <think> </think> and final answer (number) in <answer> </answer> tags." |
|
|
|
|
|
messages = [] |
|
|
|
|
|
data = data |
|
|
|
|
|
for i in data: |
|
|
message = [{ |
|
|
"role": "user", |
|
|
"content": [ |
|
|
{ |
|
|
"type": "image", |
|
|
"image": f"file://{i['image_path']}" |
|
|
}, |
|
|
{ |
|
|
"type": "text", |
|
|
"text": QUESTION_TEMPLATE.format(Question=i['question']) |
|
|
} |
|
|
] |
|
|
}] |
|
|
messages.append(message) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
all_outputs = [] |
|
|
|
|
|
|
|
|
for i in tqdm(range(0, len(messages), BSZ)): |
|
|
batch_messages = messages[i:i + BSZ] |
|
|
|
|
|
|
|
|
text = [processor.apply_chat_template(msg, tokenize=False, add_generation_prompt=True) for msg in batch_messages] |
|
|
|
|
|
image_inputs, video_inputs = process_vision_info(batch_messages) |
|
|
inputs = processor( |
|
|
text=text, |
|
|
images=image_inputs, |
|
|
videos=video_inputs, |
|
|
padding=True, |
|
|
return_tensors="pt", |
|
|
) |
|
|
inputs = inputs.to("cuda") |
|
|
|
|
|
|
|
|
generated_ids = model.generate(**inputs, use_cache=True, max_new_tokens=1024, do_sample=False) |
|
|
|
|
|
generated_ids_trimmed = [ |
|
|
out_ids[len(in_ids):] for in_ids, out_ids in zip(inputs.input_ids, generated_ids) |
|
|
] |
|
|
batch_output_text = processor.batch_decode( |
|
|
generated_ids_trimmed, skip_special_tokens=True, clean_up_tokenization_spaces=False |
|
|
) |
|
|
|
|
|
all_outputs.extend(batch_output_text) |
|
|
print(f"Processed batch {i//BSZ + 1}/{(len(messages) + BSZ - 1)//BSZ}") |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
final_output = [] |
|
|
correct_number = 0 |
|
|
|
|
|
for input_example, model_output in zip(data,all_outputs): |
|
|
original_output = model_output |
|
|
ground_truth = input_example['ground_truth'] |
|
|
model_answer = parse(original_output) |
|
|
|
|
|
|
|
|
if model_answer is not None and float(verify(model_answer,parse(ground_truth)))>0: |
|
|
correct_number += 1 |
|
|
is_correct = True |
|
|
else: |
|
|
is_correct = False |
|
|
|
|
|
try: |
|
|
result = { |
|
|
'question': input_example, |
|
|
'ground_truth': ground_truth, |
|
|
'model_output': original_output, |
|
|
'extracted_answer':str(model_answer[0]) if model_answer is not None else None, |
|
|
'is_correct':is_correct |
|
|
} |
|
|
|
|
|
except Exception as e: |
|
|
print("no answer parsed",e,model_answer) |
|
|
result = { |
|
|
'question': input_example, |
|
|
'ground_truth': ground_truth, |
|
|
'model_output': original_output, |
|
|
'extracted_answer':None, |
|
|
'is_correct':is_correct |
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
final_output.append(result) |
|
|
|
|
|
|
|
|
|
|
|
accuracy = correct_number / len(data) * 100 |
|
|
print(f"\nAccuracy: {accuracy:.2f}%") |
|
|
|
|
|
|
|
|
output_path = OUTPUT_PATH |
|
|
with open(output_path, "w") as f: |
|
|
json.dump({ |
|
|
'accuracy': accuracy, |
|
|
'results': final_output |
|
|
}, f, indent=2, ensure_ascii=False) |
|
|
|
|
|
print(f"Results saved to {output_path}") |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|