|
|
from transformers import Qwen2VLForConditionalGeneration, AutoTokenizer, AutoProcessor |
|
|
from qwen_vl_utils import process_vision_info |
|
|
import torch |
|
|
import json |
|
|
import tqdm |
|
|
from math_verify import parse, verify |
|
|
import argparse |
|
|
import pandas as pd |
|
|
from torch.multiprocessing import Process, set_start_method, Manager |
|
|
from transformers.utils.logging import disable_progress_bar |
|
|
disable_progress_bar() |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def get_eval_config(): |
|
|
parser = argparse.ArgumentParser(description="Inference script for GeoQA evaluation.") |
|
|
parser.add_argument("--model_path", required=True, type=str, help="Path to the model checkpoint (e.g., qwen2vl model or a fine-tuned model).") |
|
|
parser.add_argument("--batch_size", default=4, type=int, help="Batch size for inference. Reduce if GPU OOM (default: 50).") |
|
|
parser.add_argument("--output_path", required=True, type=str, help="Path to save inference result (e.g., JSON file).") |
|
|
parser.add_argument("--prompt_path", required=True, type=str, help="Path to the prompts JSONL file for GeoQA evaluation.") |
|
|
all_gpu = ",".join(map(str, range(torch.cuda.device_count()))) |
|
|
parser.add_argument("--gpu_ids", default=all_gpu, help="comma-separated list of GPU IDs to use") |
|
|
args = parser.parse_args() |
|
|
return args |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def prepare_test_messages(testset_path): |
|
|
testset_data = pd.read_json(testset_path, lines=True).to_dict(orient="records") |
|
|
QUESTION_TEMPLATE = "{Question} Output the thinking process in <think> </think> and final answer (number) in <answer> </answer> tags." |
|
|
tested_messages = [] |
|
|
for i in testset_data: |
|
|
message = [{ |
|
|
"role": "user", |
|
|
"content": [ |
|
|
{ |
|
|
"type": "image", |
|
|
"image": f"file://{i['image_path']}" |
|
|
}, |
|
|
{ |
|
|
"type": "text", |
|
|
"text": QUESTION_TEMPLATE.format(Question=i['question']) |
|
|
} |
|
|
] |
|
|
}] |
|
|
tested_messages.append(message) |
|
|
return testset_data, tested_messages |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def init_model(model_path, gpu_id): |
|
|
"""init a model(args.model_path) on a specific gpu""" |
|
|
|
|
|
model = Qwen2VLForConditionalGeneration.from_pretrained( |
|
|
model_path, |
|
|
torch_dtype=torch.bfloat16, |
|
|
attn_implementation="flash_attention_2", |
|
|
device_map=f"cuda:{gpu_id}", |
|
|
) |
|
|
|
|
|
|
|
|
processor = AutoProcessor.from_pretrained(model_path, use_fast=True) |
|
|
return model, processor |
|
|
|
|
|
def answer_a_batch_question_qwen(batch_messages, model, processor): |
|
|
""" let qwen answer a batch of questions """ |
|
|
text = [processor.apply_chat_template(msg, tokenize=False, add_generation_prompt=True) for msg in batch_messages] |
|
|
image_inputs, video_inputs = process_vision_info(batch_messages) |
|
|
inputs = processor( |
|
|
text=text, |
|
|
images=image_inputs, |
|
|
videos=video_inputs, |
|
|
padding=True, |
|
|
return_tensors="pt", |
|
|
) |
|
|
inputs = inputs.to(model.device) |
|
|
|
|
|
generated_ids = model.generate(**inputs, use_cache=True, max_new_tokens=1024) |
|
|
generated_ids_trimmed = [ |
|
|
out_ids[len(in_ids):] for in_ids, out_ids in zip(inputs.input_ids, generated_ids) |
|
|
] |
|
|
batch_output_text = processor.batch_decode( |
|
|
generated_ids_trimmed, skip_special_tokens=True, clean_up_tokenization_spaces=False |
|
|
) |
|
|
return batch_output_text |
|
|
|
|
|
def infer_on_single_gpu(model_path, device_id, chunk_of_tested_messages, batch_size, results=None): |
|
|
"""init model on this single gpu and let it answer asign chunk of questions""" |
|
|
model, processor = init_model(model_path, device_id) |
|
|
|
|
|
|
|
|
responses = [] |
|
|
batch_messages_list = [chunk_of_tested_messages[start: start + batch_size] |
|
|
for start in range(0, len(chunk_of_tested_messages), batch_size)] |
|
|
|
|
|
for batch_messages in tqdm.auto.tqdm(batch_messages_list, desc=f"GPU {device_id} progress", position=device_id, leave=False): |
|
|
batch_output_text = answer_a_batch_question_qwen(batch_messages, model, processor) |
|
|
|
|
|
responses.extend(batch_output_text) |
|
|
|
|
|
results[device_id] = responses |
|
|
return |
|
|
|
|
|
|
|
|
def multi_gpu_inference(prompts, gpu_ids, model_path, batch_size): |
|
|
""" let each gpu (along with a model) answer a chunk of questions """ |
|
|
set_start_method("spawn", force=True) |
|
|
manager = Manager() |
|
|
gpu_id2result = manager.dict() |
|
|
|
|
|
gpu_ids = [int(gpu_id.strip()) for gpu_id in gpu_ids.split(',')] |
|
|
num_gpus = len(gpu_ids) |
|
|
|
|
|
chunk_size = len(prompts) // num_gpus |
|
|
processes = [] |
|
|
for i, gpu_id in enumerate(gpu_ids): |
|
|
start_idx = i * chunk_size |
|
|
end_idx = (i + 1) * chunk_size if i != num_gpus - 1 else len(prompts) |
|
|
chunk = prompts[start_idx: end_idx] |
|
|
process = Process(target=infer_on_single_gpu, args=(model_path, gpu_id, chunk, batch_size, gpu_id2result)) |
|
|
process.start() |
|
|
processes.append(process) |
|
|
|
|
|
|
|
|
for process in processes: |
|
|
process.join() |
|
|
|
|
|
all_predicts = [] |
|
|
for gpu_id in gpu_ids: |
|
|
all_predicts.extend(gpu_id2result[gpu_id]) |
|
|
|
|
|
return all_predicts |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def compute_metrics(testset_data, all_predicts): |
|
|
final_output = [] |
|
|
correct_number = 0 |
|
|
|
|
|
for input_example, model_output in zip(testset_data, all_predicts): |
|
|
original_output = model_output |
|
|
ground_truth = input_example['ground_truth'] |
|
|
model_answer = parse(original_output) |
|
|
|
|
|
|
|
|
if model_answer is not None and float(verify(model_answer,parse(ground_truth)))>0: |
|
|
correct_number += 1 |
|
|
is_correct = True |
|
|
else: |
|
|
is_correct = False |
|
|
|
|
|
try: |
|
|
result = { |
|
|
'question': input_example, |
|
|
'ground_truth': ground_truth, |
|
|
'model_output': original_output, |
|
|
'extracted_answer':str(model_answer[0]) if model_answer is not None else None, |
|
|
'is_correct':is_correct |
|
|
} |
|
|
|
|
|
except Exception as e: |
|
|
print("no answer parsed",e,model_answer) |
|
|
result = { |
|
|
'question': input_example, |
|
|
'ground_truth': ground_truth, |
|
|
'model_output': original_output, |
|
|
'extracted_answer':None, |
|
|
'is_correct':is_correct |
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
final_output.append(result) |
|
|
|
|
|
|
|
|
|
|
|
accuracy = correct_number / len(tested_messages) * 100 |
|
|
print(f"\nAccuracy: {accuracy:.2f}%") |
|
|
|
|
|
|
|
|
with open(args.output_path, "w") as f: |
|
|
json.dump({ |
|
|
'accuracy': accuracy, |
|
|
'results': final_output |
|
|
}, f, indent=2, ensure_ascii=False) |
|
|
|
|
|
print(f"Results saved to {args.output_path}") |
|
|
|
|
|
|
|
|
|
|
|
if __name__ == "__main__": |
|
|
args = get_eval_config() |
|
|
testset_data, tested_messages = prepare_test_messages(testset_path=args.prompt_path) |
|
|
all_predicts = multi_gpu_inference(tested_messages, args.gpu_ids, args.model_path, args.batch_size) |
|
|
compute_metrics(testset_data, all_predicts) |
|
|
|
|
|
|