tools / utils /json /ems_jtj.py
Adinosaur's picture
Upload folder using huggingface_hub
1c980b1 verified
import json
import os
import re
from pathlib import Path
from concurrent.futures import ThreadPoolExecutor
def convert_medical_json(input_file, output_file, config=None):
"""医疗数据格式转换器(支持多格式选项解析)"""
default_config = {
"task_type": "Visual_Question_Answering",
"source": "Embspatial",
"domain": "Embodied_ai"
}
cfg = {**default_config, **(config or {})}
input_path = Path(input_file)
file_stem = input_path.stem
try:
with open(input_file, 'r', encoding='utf-8') as f:
raw_data = json.load(f)
converted = []
for index, item in enumerate(raw_data if isinstance(raw_data, list) else [raw_data]):
# 媒体路径生成
media_path = "./" + (Path("data") / file_stem / f"{index}.jpg").as_posix()
# 处理对象标注
annotations = []
objects_list = []
for obj in item.get("objects", []):
annotation = {
"bbox": obj.get("bbox", []),
"segmentation": {},
"category_name": obj.get("name", "")
}
objects_list.append(annotation)
annotations.append(objects_list)
# 问题解析增强逻辑
questions_list = item.get('questions', [])
question_for_eval = str(questions_list[0]) if questions_list else ""
options = []
question_text = ""
question_type = "free-form"
# 多格式选项解析
if "Options:" in question_for_eval:
question_type = "multi-choice"
q_parts = question_for_eval.split("Options:", 1)
question_part = q_parts[0].strip()
choices_part = q_parts[1].strip() if len(q_parts) > 1 else ""
# 清理问题文本
question_text = re.sub(r'\s+', ' ', question_part.replace("\n", " ")).strip()
# 自动生成选项ID的计数器
option_id_counter = 65 # ASCII 'A'
# 分层次解析选项
for line in re.split(r'[\n;]', choices_part):
line = line.strip()
if not line:
continue
# 处理分号分隔的选项(新增逻辑)
if re.match(r'^[^:\.]+$', line): # 没有冒号或点号的情况
for sub_opt in re.split(r';\s*', line):
sub_opt = sub_opt.strip()
if sub_opt:
options.append({
"id": chr(option_id_counter),
"text": re.sub(r'\s+', ' ', sub_opt)
})
option_id_counter += 1
else:
# 处理标准格式(A: 或 A.)
match = re.match(r'^([A-Za-z])[\.:]\s*(.+)$', line)
if match:
opt_id, opt_text = match.groups()
options.append({
"id": opt_id.upper(),
"text": re.sub(r'\s+', ' ', opt_text.strip())
})
else:
# 保底处理:自动生成ID
options.append({
"id": chr(option_id_counter),
"text": re.sub(r'\s+', ' ', line.strip())
})
option_id_counter += 1
else:
# 自由格式问题处理
question_text = re.sub(r'\s+', ' ', question_for_eval.replace("\n", " ")).strip()
# 智能答案匹配系统
def match_answer(raw_answer, options_list):
"""四层答案匹配机制"""
raw_answer = str(raw_answer).strip()
if not raw_answer:
return ""
# 1. 直接ID匹配
id_map = {opt['id'].upper(): opt['id'] for opt in options_list}
if raw_answer.upper() in id_map:
return id_map[raw_answer.upper()]
# 2. 精确文本匹配
text_to_id = {opt['text'].lower(): opt['id'] for opt in options_list}
if raw_answer.lower() in text_to_id:
return text_to_id[raw_answer.lower()]
# 3. 包含匹配(去除标点)
clean_answer = re.sub(r'[^\w\s]', '', raw_answer).lower()
for opt in options_list:
clean_text = re.sub(r'[^\w\s]', '', opt['text']).lower()
if clean_answer in clean_text:
return opt['id']
# 4. 首字母匹配
if len(raw_answer) == 1 and raw_answer.isalpha():
return raw_answer.upper()
return raw_answer # 保底返回原始值
# 处理答案
raw_answer = item.get('answer', '')
processed_answer = match_answer(raw_answer, options) if question_type == "multi-choice" else str(raw_answer)
answer = [processed_answer.strip().upper() if question_type == "multi-choice" else processed_answer.strip()]
converted.append({
"index": index,
"media_type": "image",
"media_paths": media_path,
"description": str(item.get('relation', "")),
"task_type": cfg['task_type'],
"question": [question_text],
"question_type": question_type,
"options": options,
"annotations": annotations,
"answer": answer,
"source": cfg['source'],
"domain": cfg['domain']
})
with open(output_file, 'w', encoding='utf-8') as f:
json.dump(converted, f, indent=2, ensure_ascii=False)
return True
except Exception as e:
print(f"转换失败: {input_file}{str(e)}")
return False
def process_single_file(input_path, output_dir, config):
"""文件处理单元"""
try:
output_file = output_dir / input_path.name
return convert_medical_json(
input_file=str(input_path),
output_file=str(output_file),
config=config
)
except Exception as e:
print(f"文件处理异常: {input_path}{str(e)}")
return False
def batch_convert_json(input_dir, output_dir, config=None, max_workers=8):
"""并行批量处理器"""
input_path = Path(input_dir)
output_path = Path(output_dir)
output_path.mkdir(parents=True, exist_ok=True)
success_count = 0
failure_count = 0
with ThreadPoolExecutor(max_workers=max_workers) as executor:
futures = []
for input_file in input_path.glob('*.json'):
if input_file.is_file():
futures.append(executor.submit(
process_single_file,
input_path=input_file,
output_dir=output_path,
config=config
))
for future in futures:
success_count += 1 if future.result() else 0
failure_count += 0 if future.result() else 1
print(f"\n处理完成: 成功 {success_count} 个,失败 {failure_count} 个")
print(f"输出目录: {output_path.resolve()}")
if __name__ == "__main__":
custom_config = {
"source": "EmbSpatial",
"task_type": "Object-Detection",
"domain": "Embodied_ai"
}
try:
batch_convert_json(
input_dir="/mnt/data/users/zys/proj/vlm_reasoning/unprocessed_data/emb_ai/EmbSpatial",
output_dir="/mnt/data/users/zys/proj/vlm_reasoning/dataset",
config=custom_config,
max_workers=os.cpu_count() * 2
)
except Exception as e:
print(f"批处理异常: {str(e)}")