tools / utils /json /emb_jtj.py
Adinosaur's picture
Upload folder using huggingface_hub
1c980b1 verified
import json
import os
from pathlib import Path
from concurrent.futures import ThreadPoolExecutor
def convert_medical_json(input_file, output_file, config=None):
default_config = {
"task_type": "Visual_Question_Answering",
"source": "Embspatial",
"domain": "Embodied_ai"
}
cfg = {**default_config, **(config or {})}
input_path = Path(input_file)
file_stem = input_path.stem
try:
with open(input_file, 'r', encoding='utf-8') as f:
raw_data = json.load(f)
converted = []
for index, item in enumerate(raw_data if isinstance(raw_data, list) else [raw_data]):
# 重构媒体路径
media_path = "./" + (Path("data") / file_stem / f"{index}.jpg").as_posix()
# 构建选项结构
options = []
for idx, option_text in enumerate(item['answer_options']):
opt_id = chr(65 + idx) # 65对应ASCII码'A'
options.append({"id": opt_id, "text": option_text.strip()})
try:
answer_num = int(item['answer'])
answer_ids = [options[answer_num]['id']] if 0 <= answer_num < len(options) else []
except (ValueError, IndexError, KeyError):
answer_ids = []
annotations = []
for obj in item.get("objects", []): # 提取每个item中的object数组
annotation = {
"bbox": obj.get("bbox", []),
"segmentation": {
"size": [], # 无对应数据,置空
"counts": "" # 无对应数据,置空字符串
},
"category_name": obj.get("name", "")
}
annotations.append(annotation)
converted.append({
"index": index,
"media_type": "image",
"media_paths": media_path,
"description": str(item.get('relation', "")),
"task_type": cfg['task_type'],
"question": [item.get('question', "")],
"question_type": "multi-choice",
"options": options,
"annotations": annotations,
"answer": answer_ids,
"source": cfg['source'],
"domain": cfg['domain']
})
with open(output_file, 'w', encoding='utf-8') as f:
json.dump(converted, f, indent=2, ensure_ascii=False)
return True
except Exception as e:
print(f"转换失败: {input_file}{str(e)}")
return False
def process_single_file(input_path, output_dir, config):
"""单个文件处理函数(扁平化输出)"""
try:
# 生成输出路径:输出目录 + 原文件名
output_file = output_dir / input_path.name
return convert_medical_json(
input_file=str(input_path),
output_file=str(output_file),
config=config
)
except Exception as e:
print(f"文件处理异常: {input_path}{str(e)}")
return False
def batch_convert_json(input_dir, output_dir, config=None, max_workers=8):
"""扁平化批量处理器"""
input_path = Path(input_dir)
output_path = Path(output_dir)
# 创建输出目录(只需创建一次)
output_path.mkdir(parents=True, exist_ok=True)
if not input_path.exists():
raise FileNotFoundError(f"输入目录不存在: {input_dir}")
success_count = 0
failure_count = 0
with ThreadPoolExecutor(max_workers=max_workers) as executor:
futures = []
# 仅遍历当前目录的JSON文件
for input_file in input_path.glob('*.json'): # 关键修改点
if input_file.is_file(): # 确保是文件
futures.append(
executor.submit(
process_single_file,
input_path=input_file,
output_dir=output_path,
config=config
)
)
# 统计处理结果
for future in futures:
if future.result():
success_count += 1
else:
failure_count += 1
print(f"\n处理完成: 成功 {success_count} 个文件,失败 {failure_count} 个文件")
print(f"输出目录: {output_path.resolve()}")
if __name__ == "__main__":
custom_config = {
"source": "EmbSpatial",
"task_type": "Object_Detection",
"domain": "Embodied_ai"
}
try:
batch_convert_json(
input_dir="/mnt/data/users/zys/proj/vlm_reasoning/unprocessed_data/emb_ai",
output_dir="/mnt/data/users/zys/proj/vlm_reasoning/dataset",
config=custom_config,
max_workers=min(os.cpu_count() * 2, 32)
)
except Exception as e:
print(f"批量处理异常终止: {str(e)}")