| | import json |
| | import os |
| | from pathlib import Path |
| | from concurrent.futures import ThreadPoolExecutor |
| |
|
| | def convert_medical_json(input_file, output_file, config=None): |
| | """医疗数据格式转换器(扁平化输出版本)""" |
| | default_config = { |
| | "task_type": "Visual_Question_Answering", |
| | "source": "OmniMedVQA" |
| | } |
| | cfg = {**default_config, **(config or {})} |
| |
|
| | try: |
| | with open(input_file, 'r', encoding='utf-8') as f: |
| | raw_data = json.load(f) |
| |
|
| | converted = [] |
| | for index, item in enumerate(raw_data if isinstance(raw_data, list) else [raw_data]): |
| | |
| | img_path = item['image_path'].split('Images/', 1)[-1] |
| | media_path = f"./{cfg['source']}/{img_path}" |
| |
|
| | |
| | options = [] |
| | for opt_id in ['A', 'B', 'C', 'D']: |
| | if text := item.get(f'option_{opt_id}', ''): |
| | options.append({"id": opt_id, "text": text.strip()}) |
| |
|
| | |
| | gt_answer = str(item['gt_answer']).strip() |
| | correct_ids = [opt['id'] for opt in options if opt['text'] == gt_answer] |
| |
|
| | converted.append({ |
| | "index": index, |
| | "media_type": "image", |
| | "media_paths": media_path, |
| | "description": "", |
| | "task_type": cfg['task_type'], |
| | "question": [item['question']], |
| | "question_type": "multi-choice", |
| | "options": options, |
| | "annotations": [], |
| | "answer": correct_ids, |
| | "source": cfg['source'], |
| | "domain": "Biomedical" |
| | }) |
| |
|
| | with open(output_file, 'w', encoding='utf-8') as f: |
| | json.dump(converted, f, indent=2, ensure_ascii=False) |
| | return True |
| |
|
| | except Exception as e: |
| | print(f"转换失败: {input_file} → {str(e)}") |
| | return False |
| |
|
| | def process_single_file(input_path, output_dir, config): |
| | """单个文件处理函数(扁平化输出)""" |
| | try: |
| | |
| | output_file = output_dir / input_path.name |
| | |
| | return convert_medical_json( |
| | input_file=str(input_path), |
| | output_file=str(output_file), |
| | config=config |
| | ) |
| | except Exception as e: |
| | print(f"文件处理异常: {input_path} → {str(e)}") |
| | return False |
| |
|
| | def batch_convert_json(input_dir, output_dir, config=None, max_workers=8): |
| | """扁平化批量处理器""" |
| | input_path = Path(input_dir) |
| | output_path = Path(output_dir) |
| | |
| | |
| | output_path.mkdir(parents=True, exist_ok=True) |
| |
|
| | if not input_path.exists(): |
| | raise FileNotFoundError(f"输入目录不存在: {input_dir}") |
| |
|
| | success_count = 0 |
| | failure_count = 0 |
| |
|
| | with ThreadPoolExecutor(max_workers=max_workers) as executor: |
| | futures = [] |
| | |
| | for root, _, files in os.walk(input_dir): |
| | for filename in files: |
| | if filename.lower().endswith('.json'): |
| | input_file = Path(root) / filename |
| | futures.append( |
| | executor.submit( |
| | process_single_file, |
| | input_path=input_file, |
| | output_dir=output_path, |
| | config=config |
| | ) |
| | ) |
| |
|
| | |
| | for future in futures: |
| | if future.result(): |
| | success_count += 1 |
| | else: |
| | failure_count += 1 |
| |
|
| | print(f"\n处理完成: 成功 {success_count} 个文件,失败 {failure_count} 个文件") |
| | print(f"输出目录: {output_path.resolve()}") |
| |
|
| | if __name__ == "__main__": |
| | custom_config = { |
| | "source": "OmniMedVQA", |
| | "task_type": "Visual_Question_Answering" |
| | } |
| |
|
| | try: |
| | batch_convert_json( |
| | input_dir="/mnt/data/users/zys/proj/vlm_reasoning/unprocessed_data/biomedical/medvqa/OmniMedVQA/QA_information", |
| | output_dir="/mnt/data/users/zys/proj/vlm_reasoning/unprocessed_data/biomedical/medvqa", |
| | config=custom_config, |
| | max_workers=min(os.cpu_count() * 2, 32) |
| | ) |
| | except Exception as e: |
| | print(f"批量处理异常终止: {str(e)}") |