tools / utils /json /MedQA_jtj.py
Adinosaur's picture
Upload folder using huggingface_hub
1c980b1 verified
import json
import os
from pathlib import Path
from concurrent.futures import ThreadPoolExecutor
def convert_medical_json(input_file, output_file, config=None):
"""医疗数据格式转换器(扁平化输出版本)"""
default_config = {
"task_type": "Visual_Question_Answering",
"source": "OmniMedVQA"
}
cfg = {**default_config, **(config or {})}
try:
with open(input_file, 'r', encoding='utf-8') as f:
raw_data = json.load(f)
converted = []
for index, item in enumerate(raw_data if isinstance(raw_data, list) else [raw_data]):
# 重构媒体路径
img_path = item['image_path'].split('Images/', 1)[-1]
media_path = f"./{cfg['source']}/{img_path}"
# 构建选项结构
options = []
for opt_id in ['A', 'B', 'C', 'D']:
if text := item.get(f'option_{opt_id}', ''):
options.append({"id": opt_id, "text": text.strip()})
# 匹配正确答案
gt_answer = str(item['gt_answer']).strip()
correct_ids = [opt['id'] for opt in options if opt['text'] == gt_answer]
converted.append({
"index": index,
"media_type": "image",
"media_paths": media_path,
"description": "",
"task_type": cfg['task_type'],
"question": [item['question']],
"question_type": "multi-choice",
"options": options,
"annotations": [],
"answer": correct_ids,
"source": cfg['source'],
"domain": "Biomedical"
})
with open(output_file, 'w', encoding='utf-8') as f:
json.dump(converted, f, indent=2, ensure_ascii=False)
return True
except Exception as e:
print(f"转换失败: {input_file}{str(e)}")
return False
def process_single_file(input_path, output_dir, config):
"""单个文件处理函数(扁平化输出)"""
try:
# 生成输出路径:输出目录 + 原文件名
output_file = output_dir / input_path.name
return convert_medical_json(
input_file=str(input_path),
output_file=str(output_file),
config=config
)
except Exception as e:
print(f"文件处理异常: {input_path}{str(e)}")
return False
def batch_convert_json(input_dir, output_dir, config=None, max_workers=8):
"""扁平化批量处理器"""
input_path = Path(input_dir)
output_path = Path(output_dir)
# 创建输出目录(只需创建一次)
output_path.mkdir(parents=True, exist_ok=True)
if not input_path.exists():
raise FileNotFoundError(f"输入目录不存在: {input_dir}")
success_count = 0
failure_count = 0
with ThreadPoolExecutor(max_workers=max_workers) as executor:
futures = []
# 遍历所有JSON文件(忽略目录结构)
for root, _, files in os.walk(input_dir):
for filename in files:
if filename.lower().endswith('.json'):
input_file = Path(root) / filename
futures.append(
executor.submit(
process_single_file,
input_path=input_file,
output_dir=output_path,
config=config
)
)
# 统计处理结果
for future in futures:
if future.result():
success_count += 1
else:
failure_count += 1
print(f"\n处理完成: 成功 {success_count} 个文件,失败 {failure_count} 个文件")
print(f"输出目录: {output_path.resolve()}")
if __name__ == "__main__":
custom_config = {
"source": "OmniMedVQA",
"task_type": "Visual_Question_Answering"
}
try:
batch_convert_json(
input_dir="/mnt/data/users/zys/proj/vlm_reasoning/unprocessed_data/biomedical/medvqa/OmniMedVQA/QA_information",
output_dir="/mnt/data/users/zys/proj/vlm_reasoning/unprocessed_data/biomedical/medvqa",
config=custom_config,
max_workers=min(os.cpu_count() * 2, 32)
)
except Exception as e:
print(f"批量处理异常终止: {str(e)}")