|
|
import json |
|
|
import os |
|
|
from pathlib import Path |
|
|
from concurrent.futures import ThreadPoolExecutor |
|
|
|
|
|
def convert_medical_json(input_file, output_file, config=None): |
|
|
default_config = { |
|
|
"task_type": "Visual_Question_Answering", |
|
|
"source": "Embspatial", |
|
|
"domain": "Embodied_ai" |
|
|
} |
|
|
cfg = {**default_config, **(config or {})} |
|
|
|
|
|
try: |
|
|
with open(input_file, 'r', encoding='utf-8') as f: |
|
|
raw_data = json.load(f) |
|
|
|
|
|
converted = [] |
|
|
for index, item in enumerate(raw_data if isinstance(raw_data, list) else [raw_data]): |
|
|
|
|
|
img_path = item['file_name'] |
|
|
media_path = (Path("data") / cfg['source'] / img_path).as_posix() |
|
|
media_path = f"./{media_path}" |
|
|
|
|
|
annotations = [] |
|
|
for answer_group in item.get("answers", []): |
|
|
group_annotations = [] |
|
|
for answer in answer_group: |
|
|
annotation = { |
|
|
"bbox": answer.get("bbox", []), |
|
|
"segmentation": answer.get("segmentation", {}), |
|
|
"category_name": answer.get("category_name", "") |
|
|
} |
|
|
group_annotations.append(annotation) |
|
|
annotations.append(group_annotations) |
|
|
|
|
|
converted.append({ |
|
|
"index": index, |
|
|
"media_type": "image", |
|
|
"media_paths": media_path, |
|
|
"description": "", |
|
|
"task_type": cfg['task_type'], |
|
|
"question": item.get('questions', []), |
|
|
"question_type": "detection-form", |
|
|
"options": [], |
|
|
"annotations": annotations, |
|
|
"answer": item.get('raw_answers', []), |
|
|
"source": cfg['source'], |
|
|
"domain": cfg['domain'] |
|
|
}) |
|
|
|
|
|
with open(output_file, 'w', encoding='utf-8') as f: |
|
|
json.dump(converted, f, indent=2, ensure_ascii=False) |
|
|
return True |
|
|
|
|
|
except Exception as e: |
|
|
print(f"转换失败: {input_file} → {str(e)}") |
|
|
return False |
|
|
|
|
|
def process_single_file(input_path, output_dir, config): |
|
|
"""单个文件处理函数(扁平化输出)""" |
|
|
try: |
|
|
|
|
|
output_file = output_dir / input_path.name |
|
|
|
|
|
return convert_medical_json( |
|
|
input_file=str(input_path), |
|
|
output_file=str(output_file), |
|
|
config=config |
|
|
) |
|
|
except Exception as e: |
|
|
print(f"文件处理异常: {input_path} → {str(e)}") |
|
|
return False |
|
|
|
|
|
def batch_convert_json(input_dir, output_dir, config=None, max_workers=8): |
|
|
"""扁平化批量处理器""" |
|
|
input_path = Path(input_dir) |
|
|
output_path = Path(output_dir) |
|
|
|
|
|
|
|
|
output_path.mkdir(parents=True, exist_ok=True) |
|
|
|
|
|
if not input_path.exists(): |
|
|
raise FileNotFoundError(f"输入目录不存在: {input_dir}") |
|
|
|
|
|
success_count = 0 |
|
|
failure_count = 0 |
|
|
|
|
|
with ThreadPoolExecutor(max_workers=max_workers) as executor: |
|
|
futures = [] |
|
|
|
|
|
for input_file in input_path.glob('*.json'): |
|
|
if input_file.is_file(): |
|
|
futures.append( |
|
|
executor.submit( |
|
|
process_single_file, |
|
|
input_path=input_file, |
|
|
output_dir=output_path, |
|
|
config=config |
|
|
) |
|
|
) |
|
|
|
|
|
|
|
|
for future in futures: |
|
|
if future.result(): |
|
|
success_count += 1 |
|
|
else: |
|
|
failure_count += 1 |
|
|
|
|
|
print(f"\n处理完成: 成功 {success_count} 个文件,失败 {failure_count} 个文件") |
|
|
print(f"输出目录: {output_path.resolve()}") |
|
|
|
|
|
if __name__ == "__main__": |
|
|
custom_config = { |
|
|
"source": "MMR", |
|
|
"task_type": "Multi-Format-Task", |
|
|
"domain": "General" |
|
|
} |
|
|
|
|
|
try: |
|
|
batch_convert_json( |
|
|
input_dir="/mnt/data/users/zys/proj/vlm_reasoning/unprocessed_data/general/mmr", |
|
|
output_dir="/mnt/data/users/zys/proj/vlm_reasoning/dataset", |
|
|
config=custom_config, |
|
|
max_workers=os.cpu_count() * 2 |
|
|
) |
|
|
except Exception as e: |
|
|
print(f"批量处理异常终止: {str(e)}") |