|
|
import json |
|
|
import os |
|
|
import re |
|
|
from pathlib import Path |
|
|
from concurrent.futures import ThreadPoolExecutor |
|
|
|
|
|
def convert_medical_json(input_file, output_file, config=None): |
|
|
"""医疗数据格式转换器(支持多格式选项解析)""" |
|
|
default_config = { |
|
|
"task_type": "Visual_Question_Answering", |
|
|
"source": "Embspatial", |
|
|
"domain": "Embodied_ai" |
|
|
} |
|
|
cfg = {**default_config, **(config or {})} |
|
|
input_path = Path(input_file) |
|
|
file_stem = input_path.stem |
|
|
|
|
|
try: |
|
|
with open(input_file, 'r', encoding='utf-8') as f: |
|
|
raw_data = json.load(f) |
|
|
|
|
|
converted = [] |
|
|
for index, item in enumerate(raw_data if isinstance(raw_data, list) else [raw_data]): |
|
|
|
|
|
media_path = "./" + (Path("data") / file_stem / f"{index}.jpg").as_posix() |
|
|
|
|
|
|
|
|
annotations = [] |
|
|
objects_list = [] |
|
|
for obj in item.get("objects", []): |
|
|
annotation = { |
|
|
"bbox": obj.get("bbox", []), |
|
|
"segmentation": {}, |
|
|
"category_name": obj.get("name", "") |
|
|
} |
|
|
objects_list.append(annotation) |
|
|
annotations.append(objects_list) |
|
|
|
|
|
|
|
|
questions_list = item.get('questions', []) |
|
|
question_for_eval = str(questions_list[0]) if questions_list else "" |
|
|
options = [] |
|
|
question_text = "" |
|
|
question_type = "free-form" |
|
|
|
|
|
|
|
|
if "Options:" in question_for_eval: |
|
|
question_type = "multi-choice" |
|
|
q_parts = question_for_eval.split("Options:", 1) |
|
|
question_part = q_parts[0].strip() |
|
|
choices_part = q_parts[1].strip() if len(q_parts) > 1 else "" |
|
|
|
|
|
|
|
|
question_text = re.sub(r'\s+', ' ', question_part.replace("\n", " ")).strip() |
|
|
|
|
|
|
|
|
option_id_counter = 65 |
|
|
|
|
|
|
|
|
for line in re.split(r'[\n;]', choices_part): |
|
|
line = line.strip() |
|
|
if not line: |
|
|
continue |
|
|
|
|
|
|
|
|
if re.match(r'^[^:\.]+$', line): |
|
|
for sub_opt in re.split(r';\s*', line): |
|
|
sub_opt = sub_opt.strip() |
|
|
if sub_opt: |
|
|
options.append({ |
|
|
"id": chr(option_id_counter), |
|
|
"text": re.sub(r'\s+', ' ', sub_opt) |
|
|
}) |
|
|
option_id_counter += 1 |
|
|
else: |
|
|
|
|
|
match = re.match(r'^([A-Za-z])[\.:]\s*(.+)$', line) |
|
|
if match: |
|
|
opt_id, opt_text = match.groups() |
|
|
options.append({ |
|
|
"id": opt_id.upper(), |
|
|
"text": re.sub(r'\s+', ' ', opt_text.strip()) |
|
|
}) |
|
|
else: |
|
|
|
|
|
options.append({ |
|
|
"id": chr(option_id_counter), |
|
|
"text": re.sub(r'\s+', ' ', line.strip()) |
|
|
}) |
|
|
option_id_counter += 1 |
|
|
|
|
|
else: |
|
|
|
|
|
question_text = re.sub(r'\s+', ' ', question_for_eval.replace("\n", " ")).strip() |
|
|
|
|
|
|
|
|
def match_answer(raw_answer, options_list): |
|
|
"""四层答案匹配机制""" |
|
|
raw_answer = str(raw_answer).strip() |
|
|
if not raw_answer: |
|
|
return "" |
|
|
|
|
|
|
|
|
id_map = {opt['id'].upper(): opt['id'] for opt in options_list} |
|
|
if raw_answer.upper() in id_map: |
|
|
return id_map[raw_answer.upper()] |
|
|
|
|
|
|
|
|
text_to_id = {opt['text'].lower(): opt['id'] for opt in options_list} |
|
|
if raw_answer.lower() in text_to_id: |
|
|
return text_to_id[raw_answer.lower()] |
|
|
|
|
|
|
|
|
clean_answer = re.sub(r'[^\w\s]', '', raw_answer).lower() |
|
|
for opt in options_list: |
|
|
clean_text = re.sub(r'[^\w\s]', '', opt['text']).lower() |
|
|
if clean_answer in clean_text: |
|
|
return opt['id'] |
|
|
|
|
|
|
|
|
if len(raw_answer) == 1 and raw_answer.isalpha(): |
|
|
return raw_answer.upper() |
|
|
|
|
|
return raw_answer |
|
|
|
|
|
|
|
|
raw_answer = item.get('answer', '') |
|
|
processed_answer = match_answer(raw_answer, options) if question_type == "multi-choice" else str(raw_answer) |
|
|
answer = [processed_answer.strip().upper() if question_type == "multi-choice" else processed_answer.strip()] |
|
|
|
|
|
converted.append({ |
|
|
"index": index, |
|
|
"media_type": "image", |
|
|
"media_paths": media_path, |
|
|
"description": str(item.get('relation', "")), |
|
|
"task_type": cfg['task_type'], |
|
|
"question": [question_text], |
|
|
"question_type": question_type, |
|
|
"options": options, |
|
|
"annotations": annotations, |
|
|
"answer": answer, |
|
|
"source": cfg['source'], |
|
|
"domain": cfg['domain'] |
|
|
}) |
|
|
|
|
|
with open(output_file, 'w', encoding='utf-8') as f: |
|
|
json.dump(converted, f, indent=2, ensure_ascii=False) |
|
|
return True |
|
|
|
|
|
except Exception as e: |
|
|
print(f"转换失败: {input_file} → {str(e)}") |
|
|
return False |
|
|
|
|
|
def process_single_file(input_path, output_dir, config): |
|
|
"""文件处理单元""" |
|
|
try: |
|
|
output_file = output_dir / input_path.name |
|
|
return convert_medical_json( |
|
|
input_file=str(input_path), |
|
|
output_file=str(output_file), |
|
|
config=config |
|
|
) |
|
|
except Exception as e: |
|
|
print(f"文件处理异常: {input_path} → {str(e)}") |
|
|
return False |
|
|
|
|
|
def batch_convert_json(input_dir, output_dir, config=None, max_workers=8): |
|
|
"""并行批量处理器""" |
|
|
input_path = Path(input_dir) |
|
|
output_path = Path(output_dir) |
|
|
output_path.mkdir(parents=True, exist_ok=True) |
|
|
|
|
|
success_count = 0 |
|
|
failure_count = 0 |
|
|
|
|
|
with ThreadPoolExecutor(max_workers=max_workers) as executor: |
|
|
futures = [] |
|
|
for input_file in input_path.glob('*.json'): |
|
|
if input_file.is_file(): |
|
|
futures.append(executor.submit( |
|
|
process_single_file, |
|
|
input_path=input_file, |
|
|
output_dir=output_path, |
|
|
config=config |
|
|
)) |
|
|
|
|
|
for future in futures: |
|
|
success_count += 1 if future.result() else 0 |
|
|
failure_count += 0 if future.result() else 1 |
|
|
|
|
|
print(f"\n处理完成: 成功 {success_count} 个,失败 {failure_count} 个") |
|
|
print(f"输出目录: {output_path.resolve()}") |
|
|
|
|
|
if __name__ == "__main__": |
|
|
custom_config = { |
|
|
"source": "EmbSpatial", |
|
|
"task_type": "Object-Detection", |
|
|
"domain": "Embodied_ai" |
|
|
} |
|
|
|
|
|
try: |
|
|
batch_convert_json( |
|
|
input_dir="/mnt/data/users/zys/proj/vlm_reasoning/unprocessed_data/emb_ai/EmbSpatial", |
|
|
output_dir="/mnt/data/users/zys/proj/vlm_reasoning/dataset", |
|
|
config=custom_config, |
|
|
max_workers=os.cpu_count() * 2 |
|
|
) |
|
|
except Exception as e: |
|
|
print(f"批处理异常: {str(e)}") |