Video-R1 / previous_version /Video-R1-main-previous /src /eval /test_qwen2vl_geoqa_multigpu.py
DingZhenDojoCat's picture
Add files using upload-large-folder tool
7ed0fb5 verified
from transformers import Qwen2VLForConditionalGeneration, AutoTokenizer, AutoProcessor
from qwen_vl_utils import process_vision_info
import torch
import json
import tqdm
from math_verify import parse, verify
import argparse
import pandas as pd
from torch.multiprocessing import Process, set_start_method, Manager
from transformers.utils.logging import disable_progress_bar
disable_progress_bar()
# >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
# >>>>> 1. get evaluation configuration <<<<<
# >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
def get_eval_config():
parser = argparse.ArgumentParser(description="Inference script for GeoQA evaluation.")
parser.add_argument("--model_path", required=True, type=str, help="Path to the model checkpoint (e.g., qwen2vl model or a fine-tuned model).")
parser.add_argument("--batch_size", default=4, type=int, help="Batch size for inference. Reduce if GPU OOM (default: 50).")
parser.add_argument("--output_path", required=True, type=str, help="Path to save inference result (e.g., JSON file).")
parser.add_argument("--prompt_path", required=True, type=str, help="Path to the prompts JSONL file for GeoQA evaluation.")
all_gpu = ",".join(map(str, range(torch.cuda.device_count())))
parser.add_argument("--gpu_ids", default=all_gpu, help="comma-separated list of GPU IDs to use")
args = parser.parse_args()
return args
# >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
# >>>>>>>>>> 2. load testset <<<<<<<<<<<<<
# >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
def prepare_test_messages(testset_path):
testset_data = pd.read_json(testset_path, lines=True).to_dict(orient="records")
QUESTION_TEMPLATE = "{Question} Output the thinking process in <think> </think> and final answer (number) in <answer> </answer> tags."
tested_messages = []
for i in testset_data:
message = [{
"role": "user",
"content": [
{
"type": "image",
"image": f"file://{i['image_path']}"
},
{
"type": "text",
"text": QUESTION_TEMPLATE.format(Question=i['question'])
}
]
}]
tested_messages.append(message)
return testset_data, tested_messages
# >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
# >>>>> 3. use several GPUs to accelerate inference at testset <<<<<
# >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
def init_model(model_path, gpu_id):
"""init a model(args.model_path) on a specific gpu"""
# We recommend enabling flash_attention_2 for better acceleration and memory saving, especially in multi-image and video scenarios.
model = Qwen2VLForConditionalGeneration.from_pretrained(
model_path,
torch_dtype=torch.bfloat16,
attn_implementation="flash_attention_2",
device_map=f"cuda:{gpu_id}",
)
# default processer
processor = AutoProcessor.from_pretrained(model_path, use_fast=True)
return model, processor
def answer_a_batch_question_qwen(batch_messages, model, processor):
""" let qwen answer a batch of questions """
text = [processor.apply_chat_template(msg, tokenize=False, add_generation_prompt=True) for msg in batch_messages]
image_inputs, video_inputs = process_vision_info(batch_messages)
inputs = processor(
text=text,
images=image_inputs,
videos=video_inputs,
padding=True,
return_tensors="pt",
)
inputs = inputs.to(model.device)
generated_ids = model.generate(**inputs, use_cache=True, max_new_tokens=1024) # do_sample=False
generated_ids_trimmed = [
out_ids[len(in_ids):] for in_ids, out_ids in zip(inputs.input_ids, generated_ids)
]
batch_output_text = processor.batch_decode(
generated_ids_trimmed, skip_special_tokens=True, clean_up_tokenization_spaces=False
)
return batch_output_text
def infer_on_single_gpu(model_path, device_id, chunk_of_tested_messages, batch_size, results=None):
"""init model on this single gpu and let it answer asign chunk of questions"""
model, processor = init_model(model_path, device_id)
### split batch
responses = []
batch_messages_list = [chunk_of_tested_messages[start: start + batch_size]
for start in range(0, len(chunk_of_tested_messages), batch_size)]
for batch_messages in tqdm.auto.tqdm(batch_messages_list, desc=f"GPU {device_id} progress", position=device_id, leave=False):
batch_output_text = answer_a_batch_question_qwen(batch_messages, model, processor)
responses.extend(batch_output_text)
results[device_id] = responses
return
def multi_gpu_inference(prompts, gpu_ids, model_path, batch_size):
""" let each gpu (along with a model) answer a chunk of questions """
set_start_method("spawn", force=True)
manager = Manager()
gpu_id2result = manager.dict()
gpu_ids = [int(gpu_id.strip()) for gpu_id in gpu_ids.split(',')]
num_gpus = len(gpu_ids)
chunk_size = len(prompts) // num_gpus
processes = []
for i, gpu_id in enumerate(gpu_ids):
start_idx = i * chunk_size
end_idx = (i + 1) * chunk_size if i != num_gpus - 1 else len(prompts)
chunk = prompts[start_idx: end_idx]
process = Process(target=infer_on_single_gpu, args=(model_path, gpu_id, chunk, batch_size, gpu_id2result))
process.start()
processes.append(process)
# for process in tqdm.auto.tqdm(processes, desc="Inference progress", position=num_gpus, leave=True):
for process in processes:
process.join()
all_predicts = []
for gpu_id in gpu_ids:
all_predicts.extend(gpu_id2result[gpu_id])
return all_predicts
# >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
# >>>>>>>>>> 4. compute metrics <<<<<<<<<<<
# >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
def compute_metrics(testset_data, all_predicts):
final_output = []
correct_number = 0
for input_example, model_output in zip(testset_data, all_predicts):
original_output = model_output
ground_truth = input_example['ground_truth']
model_answer = parse(original_output)
# Count correct answers
if model_answer is not None and float(verify(model_answer,parse(ground_truth)))>0:
correct_number += 1
is_correct = True
else:
is_correct = False
try:
result = {
'question': input_example,
'ground_truth': ground_truth,
'model_output': original_output,
'extracted_answer':str(model_answer[0]) if model_answer is not None else None,
'is_correct':is_correct
}
except Exception as e:
print("no answer parsed",e,model_answer)
result = {
'question': input_example,
'ground_truth': ground_truth,
'model_output': original_output,
'extracted_answer':None,
'is_correct':is_correct
}
final_output.append(result)
# Calculate and print accuracy
accuracy = correct_number / len(tested_messages) * 100
print(f"\nAccuracy: {accuracy:.2f}%")
# Save results to a JSON file
with open(args.output_path, "w") as f:
json.dump({
'accuracy': accuracy,
'results': final_output
}, f, indent=2, ensure_ascii=False)
print(f"Results saved to {args.output_path}")
if __name__ == "__main__":
args = get_eval_config()
testset_data, tested_messages = prepare_test_messages(testset_path=args.prompt_path)
all_predicts = multi_gpu_inference(tested_messages, args.gpu_ids, args.model_path, args.batch_size)
compute_metrics(testset_data, all_predicts)