import json import os import re from pathlib import Path from concurrent.futures import ThreadPoolExecutor def convert_medical_json(input_file, output_file, config=None): """医疗数据格式转换器(支持多格式选项解析)""" default_config = { "task_type": "Visual_Question_Answering", "source": "Embspatial", "domain": "Embodied_ai" } cfg = {**default_config, **(config or {})} input_path = Path(input_file) file_stem = input_path.stem try: with open(input_file, 'r', encoding='utf-8') as f: raw_data = json.load(f) converted = [] for index, item in enumerate(raw_data if isinstance(raw_data, list) else [raw_data]): # 媒体路径生成 media_path = "./" + (Path("data") / file_stem / f"{index}.jpg").as_posix() # 处理对象标注 annotations = [] objects_list = [] for obj in item.get("objects", []): annotation = { "bbox": obj.get("bbox", []), "segmentation": {}, "category_name": obj.get("name", "") } objects_list.append(annotation) annotations.append(objects_list) # 问题解析增强逻辑 questions_list = item.get('questions', []) question_for_eval = str(questions_list[0]) if questions_list else "" options = [] question_text = "" question_type = "free-form" # 多格式选项解析 if "Options:" in question_for_eval: question_type = "multi-choice" q_parts = question_for_eval.split("Options:", 1) question_part = q_parts[0].strip() choices_part = q_parts[1].strip() if len(q_parts) > 1 else "" # 清理问题文本 question_text = re.sub(r'\s+', ' ', question_part.replace("\n", " ")).strip() # 自动生成选项ID的计数器 option_id_counter = 65 # ASCII 'A' # 分层次解析选项 for line in re.split(r'[\n;]', choices_part): line = line.strip() if not line: continue # 处理分号分隔的选项(新增逻辑) if re.match(r'^[^:\.]+$', line): # 没有冒号或点号的情况 for sub_opt in re.split(r';\s*', line): sub_opt = sub_opt.strip() if sub_opt: options.append({ "id": chr(option_id_counter), "text": re.sub(r'\s+', ' ', sub_opt) }) option_id_counter += 1 else: # 处理标准格式(A: 或 A.) match = re.match(r'^([A-Za-z])[\.:]\s*(.+)$', line) if match: opt_id, opt_text = match.groups() options.append({ "id": opt_id.upper(), "text": re.sub(r'\s+', ' ', opt_text.strip()) }) else: # 保底处理:自动生成ID options.append({ "id": chr(option_id_counter), "text": re.sub(r'\s+', ' ', line.strip()) }) option_id_counter += 1 else: # 自由格式问题处理 question_text = re.sub(r'\s+', ' ', question_for_eval.replace("\n", " ")).strip() # 智能答案匹配系统 def match_answer(raw_answer, options_list): """四层答案匹配机制""" raw_answer = str(raw_answer).strip() if not raw_answer: return "" # 1. 直接ID匹配 id_map = {opt['id'].upper(): opt['id'] for opt in options_list} if raw_answer.upper() in id_map: return id_map[raw_answer.upper()] # 2. 精确文本匹配 text_to_id = {opt['text'].lower(): opt['id'] for opt in options_list} if raw_answer.lower() in text_to_id: return text_to_id[raw_answer.lower()] # 3. 包含匹配(去除标点) clean_answer = re.sub(r'[^\w\s]', '', raw_answer).lower() for opt in options_list: clean_text = re.sub(r'[^\w\s]', '', opt['text']).lower() if clean_answer in clean_text: return opt['id'] # 4. 首字母匹配 if len(raw_answer) == 1 and raw_answer.isalpha(): return raw_answer.upper() return raw_answer # 保底返回原始值 # 处理答案 raw_answer = item.get('answer', '') processed_answer = match_answer(raw_answer, options) if question_type == "multi-choice" else str(raw_answer) answer = [processed_answer.strip().upper() if question_type == "multi-choice" else processed_answer.strip()] converted.append({ "index": index, "media_type": "image", "media_paths": media_path, "description": str(item.get('relation', "")), "task_type": cfg['task_type'], "question": [question_text], "question_type": question_type, "options": options, "annotations": annotations, "answer": answer, "source": cfg['source'], "domain": cfg['domain'] }) with open(output_file, 'w', encoding='utf-8') as f: json.dump(converted, f, indent=2, ensure_ascii=False) return True except Exception as e: print(f"转换失败: {input_file} → {str(e)}") return False def process_single_file(input_path, output_dir, config): """文件处理单元""" try: output_file = output_dir / input_path.name return convert_medical_json( input_file=str(input_path), output_file=str(output_file), config=config ) except Exception as e: print(f"文件处理异常: {input_path} → {str(e)}") return False def batch_convert_json(input_dir, output_dir, config=None, max_workers=8): """并行批量处理器""" input_path = Path(input_dir) output_path = Path(output_dir) output_path.mkdir(parents=True, exist_ok=True) success_count = 0 failure_count = 0 with ThreadPoolExecutor(max_workers=max_workers) as executor: futures = [] for input_file in input_path.glob('*.json'): if input_file.is_file(): futures.append(executor.submit( process_single_file, input_path=input_file, output_dir=output_path, config=config )) for future in futures: success_count += 1 if future.result() else 0 failure_count += 0 if future.result() else 1 print(f"\n处理完成: 成功 {success_count} 个,失败 {failure_count} 个") print(f"输出目录: {output_path.resolve()}") if __name__ == "__main__": custom_config = { "source": "EmbSpatial", "task_type": "Object-Detection", "domain": "Embodied_ai" } try: batch_convert_json( input_dir="/mnt/data/users/zys/proj/vlm_reasoning/unprocessed_data/emb_ai/EmbSpatial", output_dir="/mnt/data/users/zys/proj/vlm_reasoning/dataset", config=custom_config, max_workers=os.cpu_count() * 2 ) except Exception as e: print(f"批处理异常: {str(e)}")