File size: 5,002 Bytes
1c980b1 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 |
import json
import os
from pathlib import Path
from concurrent.futures import ThreadPoolExecutor
def convert_medical_json(input_file, output_file, config=None):
default_config = {
"task_type": "Visual_Question_Answering",
"source": "Embspatial",
"domain": "Embodied_ai"
}
cfg = {**default_config, **(config or {})}
input_path = Path(input_file)
file_stem = input_path.stem
try:
with open(input_file, 'r', encoding='utf-8') as f:
raw_data = json.load(f)
converted = []
for index, item in enumerate(raw_data if isinstance(raw_data, list) else [raw_data]):
# 重构媒体路径
media_path = "./" + (Path("data") / file_stem / f"{index}.jpg").as_posix()
# 构建选项结构
options = []
for idx, option_text in enumerate(item['answer_options']):
opt_id = chr(65 + idx) # 65对应ASCII码'A'
options.append({"id": opt_id, "text": option_text.strip()})
try:
answer_num = int(item['answer'])
answer_ids = [options[answer_num]['id']] if 0 <= answer_num < len(options) else []
except (ValueError, IndexError, KeyError):
answer_ids = []
annotations = []
for obj in item.get("objects", []): # 提取每个item中的object数组
annotation = {
"bbox": obj.get("bbox", []),
"segmentation": {
"size": [], # 无对应数据,置空
"counts": "" # 无对应数据,置空字符串
},
"category_name": obj.get("name", "")
}
annotations.append(annotation)
converted.append({
"index": index,
"media_type": "image",
"media_paths": media_path,
"description": str(item.get('relation', "")),
"task_type": cfg['task_type'],
"question": [item.get('question', "")],
"question_type": "multi-choice",
"options": options,
"annotations": annotations,
"answer": answer_ids,
"source": cfg['source'],
"domain": cfg['domain']
})
with open(output_file, 'w', encoding='utf-8') as f:
json.dump(converted, f, indent=2, ensure_ascii=False)
return True
except Exception as e:
print(f"转换失败: {input_file} → {str(e)}")
return False
def process_single_file(input_path, output_dir, config):
"""单个文件处理函数(扁平化输出)"""
try:
# 生成输出路径:输出目录 + 原文件名
output_file = output_dir / input_path.name
return convert_medical_json(
input_file=str(input_path),
output_file=str(output_file),
config=config
)
except Exception as e:
print(f"文件处理异常: {input_path} → {str(e)}")
return False
def batch_convert_json(input_dir, output_dir, config=None, max_workers=8):
"""扁平化批量处理器"""
input_path = Path(input_dir)
output_path = Path(output_dir)
# 创建输出目录(只需创建一次)
output_path.mkdir(parents=True, exist_ok=True)
if not input_path.exists():
raise FileNotFoundError(f"输入目录不存在: {input_dir}")
success_count = 0
failure_count = 0
with ThreadPoolExecutor(max_workers=max_workers) as executor:
futures = []
# 仅遍历当前目录的JSON文件
for input_file in input_path.glob('*.json'): # 关键修改点
if input_file.is_file(): # 确保是文件
futures.append(
executor.submit(
process_single_file,
input_path=input_file,
output_dir=output_path,
config=config
)
)
# 统计处理结果
for future in futures:
if future.result():
success_count += 1
else:
failure_count += 1
print(f"\n处理完成: 成功 {success_count} 个文件,失败 {failure_count} 个文件")
print(f"输出目录: {output_path.resolve()}")
if __name__ == "__main__":
custom_config = {
"source": "EmbSpatial",
"task_type": "Object_Detection",
"domain": "Embodied_ai"
}
try:
batch_convert_json(
input_dir="/mnt/data/users/zys/proj/vlm_reasoning/unprocessed_data/emb_ai",
output_dir="/mnt/data/users/zys/proj/vlm_reasoning/dataset",
config=custom_config,
max_workers=min(os.cpu_count() * 2, 32)
)
except Exception as e:
print(f"批量处理异常终止: {str(e)}") |