File size: 8,235 Bytes
1c980b1 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 |
import json
import os
import re
from pathlib import Path
from concurrent.futures import ThreadPoolExecutor
def convert_medical_json(input_file, output_file, config=None):
"""医疗数据格式转换器(支持多格式选项解析)"""
default_config = {
"task_type": "Visual_Question_Answering",
"source": "Embspatial",
"domain": "Embodied_ai"
}
cfg = {**default_config, **(config or {})}
input_path = Path(input_file)
file_stem = input_path.stem
try:
with open(input_file, 'r', encoding='utf-8') as f:
raw_data = json.load(f)
converted = []
for index, item in enumerate(raw_data if isinstance(raw_data, list) else [raw_data]):
# 媒体路径生成
media_path = "./" + (Path("data") / file_stem / f"{index}.jpg").as_posix()
# 处理对象标注
annotations = []
objects_list = []
for obj in item.get("objects", []):
annotation = {
"bbox": obj.get("bbox", []),
"segmentation": {},
"category_name": obj.get("name", "")
}
objects_list.append(annotation)
annotations.append(objects_list)
# 问题解析增强逻辑
questions_list = item.get('questions', [])
question_for_eval = str(questions_list[0]) if questions_list else ""
options = []
question_text = ""
question_type = "free-form"
# 多格式选项解析
if "Options:" in question_for_eval:
question_type = "multi-choice"
q_parts = question_for_eval.split("Options:", 1)
question_part = q_parts[0].strip()
choices_part = q_parts[1].strip() if len(q_parts) > 1 else ""
# 清理问题文本
question_text = re.sub(r'\s+', ' ', question_part.replace("\n", " ")).strip()
# 自动生成选项ID的计数器
option_id_counter = 65 # ASCII 'A'
# 分层次解析选项
for line in re.split(r'[\n;]', choices_part):
line = line.strip()
if not line:
continue
# 处理分号分隔的选项(新增逻辑)
if re.match(r'^[^:\.]+$', line): # 没有冒号或点号的情况
for sub_opt in re.split(r';\s*', line):
sub_opt = sub_opt.strip()
if sub_opt:
options.append({
"id": chr(option_id_counter),
"text": re.sub(r'\s+', ' ', sub_opt)
})
option_id_counter += 1
else:
# 处理标准格式(A: 或 A.)
match = re.match(r'^([A-Za-z])[\.:]\s*(.+)$', line)
if match:
opt_id, opt_text = match.groups()
options.append({
"id": opt_id.upper(),
"text": re.sub(r'\s+', ' ', opt_text.strip())
})
else:
# 保底处理:自动生成ID
options.append({
"id": chr(option_id_counter),
"text": re.sub(r'\s+', ' ', line.strip())
})
option_id_counter += 1
else:
# 自由格式问题处理
question_text = re.sub(r'\s+', ' ', question_for_eval.replace("\n", " ")).strip()
# 智能答案匹配系统
def match_answer(raw_answer, options_list):
"""四层答案匹配机制"""
raw_answer = str(raw_answer).strip()
if not raw_answer:
return ""
# 1. 直接ID匹配
id_map = {opt['id'].upper(): opt['id'] for opt in options_list}
if raw_answer.upper() in id_map:
return id_map[raw_answer.upper()]
# 2. 精确文本匹配
text_to_id = {opt['text'].lower(): opt['id'] for opt in options_list}
if raw_answer.lower() in text_to_id:
return text_to_id[raw_answer.lower()]
# 3. 包含匹配(去除标点)
clean_answer = re.sub(r'[^\w\s]', '', raw_answer).lower()
for opt in options_list:
clean_text = re.sub(r'[^\w\s]', '', opt['text']).lower()
if clean_answer in clean_text:
return opt['id']
# 4. 首字母匹配
if len(raw_answer) == 1 and raw_answer.isalpha():
return raw_answer.upper()
return raw_answer # 保底返回原始值
# 处理答案
raw_answer = item.get('answer', '')
processed_answer = match_answer(raw_answer, options) if question_type == "multi-choice" else str(raw_answer)
answer = [processed_answer.strip().upper() if question_type == "multi-choice" else processed_answer.strip()]
converted.append({
"index": index,
"media_type": "image",
"media_paths": media_path,
"description": str(item.get('relation', "")),
"task_type": cfg['task_type'],
"question": [question_text],
"question_type": question_type,
"options": options,
"annotations": annotations,
"answer": answer,
"source": cfg['source'],
"domain": cfg['domain']
})
with open(output_file, 'w', encoding='utf-8') as f:
json.dump(converted, f, indent=2, ensure_ascii=False)
return True
except Exception as e:
print(f"转换失败: {input_file} → {str(e)}")
return False
def process_single_file(input_path, output_dir, config):
"""文件处理单元"""
try:
output_file = output_dir / input_path.name
return convert_medical_json(
input_file=str(input_path),
output_file=str(output_file),
config=config
)
except Exception as e:
print(f"文件处理异常: {input_path} → {str(e)}")
return False
def batch_convert_json(input_dir, output_dir, config=None, max_workers=8):
"""并行批量处理器"""
input_path = Path(input_dir)
output_path = Path(output_dir)
output_path.mkdir(parents=True, exist_ok=True)
success_count = 0
failure_count = 0
with ThreadPoolExecutor(max_workers=max_workers) as executor:
futures = []
for input_file in input_path.glob('*.json'):
if input_file.is_file():
futures.append(executor.submit(
process_single_file,
input_path=input_file,
output_dir=output_path,
config=config
))
for future in futures:
success_count += 1 if future.result() else 0
failure_count += 0 if future.result() else 1
print(f"\n处理完成: 成功 {success_count} 个,失败 {failure_count} 个")
print(f"输出目录: {output_path.resolve()}")
if __name__ == "__main__":
custom_config = {
"source": "EmbSpatial",
"task_type": "Object-Detection",
"domain": "Embodied_ai"
}
try:
batch_convert_json(
input_dir="/mnt/data/users/zys/proj/vlm_reasoning/unprocessed_data/emb_ai/EmbSpatial",
output_dir="/mnt/data/users/zys/proj/vlm_reasoning/dataset",
config=custom_config,
max_workers=os.cpu_count() * 2
)
except Exception as e:
print(f"批处理异常: {str(e)}") |