tools / utils /json /MMR.py
Adinosaur's picture
Upload folder using huggingface_hub
1c980b1 verified
import json
import os
from pathlib import Path
from concurrent.futures import ThreadPoolExecutor
def convert_medical_json(input_file, output_file, config=None):
default_config = {
"task_type": "Visual_Question_Answering",
"source": "Embspatial",
"domain": "Embodied_ai"
}
cfg = {**default_config, **(config or {})}
try:
with open(input_file, 'r', encoding='utf-8') as f:
raw_data = json.load(f)
converted = []
for index, item in enumerate(raw_data if isinstance(raw_data, list) else [raw_data]):
# 重构媒体路径
img_path = item['file_name']
media_path = (Path("data") / cfg['source'] / img_path).as_posix()
media_path = f"./{media_path}"
# 处理annotations(原样保留answers结构)
annotations = []
for answer_group in item.get("answers", []): # 遍历每个答案组
group_annotations = []
for answer in answer_group: # 遍历组中的每个答案对象
annotation = {
"bbox": answer.get("bbox", []),
"segmentation": answer.get("segmentation", {}),
"category_name": answer.get("category_name", "")
}
group_annotations.append(annotation)
annotations.append(group_annotations)
converted.append({
"index": index,
"media_type": "image",
"media_paths": media_path,
"description": "",
"task_type": cfg['task_type'],
"question": item.get('questions', []), # 直接取列表值
"question_type": "detection-form",
"options": [],
"annotations": annotations, # 保持原列表套列表结构
"answer": item.get('raw_answers', []), # 直接取列表值
"source": cfg['source'],
"domain": cfg['domain']
})
with open(output_file, 'w', encoding='utf-8') as f:
json.dump(converted, f, indent=2, ensure_ascii=False)
return True
except Exception as e:
print(f"转换失败: {input_file}{str(e)}")
return False
def process_single_file(input_path, output_dir, config):
"""单个文件处理函数(扁平化输出)"""
try:
# 生成输出路径:输出目录 + 原文件名
output_file = output_dir / input_path.name
return convert_medical_json(
input_file=str(input_path),
output_file=str(output_file),
config=config
)
except Exception as e:
print(f"文件处理异常: {input_path}{str(e)}")
return False
def batch_convert_json(input_dir, output_dir, config=None, max_workers=8):
"""扁平化批量处理器"""
input_path = Path(input_dir)
output_path = Path(output_dir)
# 创建输出目录(只需创建一次)
output_path.mkdir(parents=True, exist_ok=True)
if not input_path.exists():
raise FileNotFoundError(f"输入目录不存在: {input_dir}")
success_count = 0
failure_count = 0
with ThreadPoolExecutor(max_workers=max_workers) as executor:
futures = []
# 仅遍历当前目录的JSON文件
for input_file in input_path.glob('*.json'): # 关键修改点
if input_file.is_file(): # 确保是文件
futures.append(
executor.submit(
process_single_file,
input_path=input_file,
output_dir=output_path,
config=config
)
)
# 统计处理结果
for future in futures:
if future.result():
success_count += 1
else:
failure_count += 1
print(f"\n处理完成: 成功 {success_count} 个文件,失败 {failure_count} 个文件")
print(f"输出目录: {output_path.resolve()}")
if __name__ == "__main__":
custom_config = {
"source": "MMR",
"task_type": "Multi-Format-Task",
"domain": "General"
}
try:
batch_convert_json(
input_dir="/mnt/data/users/zys/proj/vlm_reasoning/unprocessed_data/general/mmr",
output_dir="/mnt/data/users/zys/proj/vlm_reasoning/dataset",
config=custom_config,
max_workers=os.cpu_count() * 2
)
except Exception as e:
print(f"批量处理异常终止: {str(e)}")