import json import os from pathlib import Path from concurrent.futures import ThreadPoolExecutor def convert_medical_json(input_file, output_file, config=None): default_config = { "task_type": "Visual_Question_Answering", "source": "Embspatial", "domain": "Embodied_ai" } cfg = {**default_config, **(config or {})} input_path = Path(input_file) file_stem = input_path.stem try: with open(input_file, 'r', encoding='utf-8') as f: raw_data = json.load(f) converted = [] for index, item in enumerate(raw_data if isinstance(raw_data, list) else [raw_data]): # 重构媒体路径 media_path = "./" + (Path("data") / file_stem / f"{index}.jpg").as_posix() # 构建选项结构 options = [] for idx, option_text in enumerate(item['answer_options']): opt_id = chr(65 + idx) # 65对应ASCII码'A' options.append({"id": opt_id, "text": option_text.strip()}) try: answer_num = int(item['answer']) answer_ids = [options[answer_num]['id']] if 0 <= answer_num < len(options) else [] except (ValueError, IndexError, KeyError): answer_ids = [] annotations = [] for obj in item.get("objects", []): # 提取每个item中的object数组 annotation = { "bbox": obj.get("bbox", []), "segmentation": { "size": [], # 无对应数据,置空 "counts": "" # 无对应数据,置空字符串 }, "category_name": obj.get("name", "") } annotations.append(annotation) converted.append({ "index": index, "media_type": "image", "media_paths": media_path, "description": str(item.get('relation', "")), "task_type": cfg['task_type'], "question": [item.get('question', "")], "question_type": "multi-choice", "options": options, "annotations": annotations, "answer": answer_ids, "source": cfg['source'], "domain": cfg['domain'] }) with open(output_file, 'w', encoding='utf-8') as f: json.dump(converted, f, indent=2, ensure_ascii=False) return True except Exception as e: print(f"转换失败: {input_file} → {str(e)}") return False def process_single_file(input_path, output_dir, config): """单个文件处理函数(扁平化输出)""" try: # 生成输出路径:输出目录 + 原文件名 output_file = output_dir / input_path.name return convert_medical_json( input_file=str(input_path), output_file=str(output_file), config=config ) except Exception as e: print(f"文件处理异常: {input_path} → {str(e)}") return False def batch_convert_json(input_dir, output_dir, config=None, max_workers=8): """扁平化批量处理器""" input_path = Path(input_dir) output_path = Path(output_dir) # 创建输出目录(只需创建一次) output_path.mkdir(parents=True, exist_ok=True) if not input_path.exists(): raise FileNotFoundError(f"输入目录不存在: {input_dir}") success_count = 0 failure_count = 0 with ThreadPoolExecutor(max_workers=max_workers) as executor: futures = [] # 仅遍历当前目录的JSON文件 for input_file in input_path.glob('*.json'): # 关键修改点 if input_file.is_file(): # 确保是文件 futures.append( executor.submit( process_single_file, input_path=input_file, output_dir=output_path, config=config ) ) # 统计处理结果 for future in futures: if future.result(): success_count += 1 else: failure_count += 1 print(f"\n处理完成: 成功 {success_count} 个文件,失败 {failure_count} 个文件") print(f"输出目录: {output_path.resolve()}") if __name__ == "__main__": custom_config = { "source": "EmbSpatial", "task_type": "Object_Detection", "domain": "Embodied_ai" } try: batch_convert_json( input_dir="/mnt/data/users/zys/proj/vlm_reasoning/unprocessed_data/emb_ai", output_dir="/mnt/data/users/zys/proj/vlm_reasoning/dataset", config=custom_config, max_workers=min(os.cpu_count() * 2, 32) ) except Exception as e: print(f"批量处理异常终止: {str(e)}")