File size: 8,235 Bytes
1c980b1
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
import json
import os
import re
from pathlib import Path
from concurrent.futures import ThreadPoolExecutor

def convert_medical_json(input_file, output_file, config=None):
    """医疗数据格式转换器(支持多格式选项解析)"""
    default_config = {
        "task_type": "Visual_Question_Answering",
        "source": "Embspatial",
        "domain": "Embodied_ai"
    }
    cfg = {**default_config, **(config or {})}
    input_path = Path(input_file)
    file_stem = input_path.stem

    try:
        with open(input_file, 'r', encoding='utf-8') as f:
            raw_data = json.load(f)

        converted = []
        for index, item in enumerate(raw_data if isinstance(raw_data, list) else [raw_data]):
            # 媒体路径生成
            media_path = "./" + (Path("data") / file_stem / f"{index}.jpg").as_posix()

            # 处理对象标注
            annotations = []
            objects_list = []
            for obj in item.get("objects", []):
                annotation = {
                    "bbox": obj.get("bbox", []),
                    "segmentation": {},
                    "category_name": obj.get("name", "")
                }
                objects_list.append(annotation)
            annotations.append(objects_list)

            # 问题解析增强逻辑
            questions_list = item.get('questions', [])
            question_for_eval = str(questions_list[0]) if questions_list else ""
            options = []
            question_text = ""
            question_type = "free-form"

            # 多格式选项解析
            if "Options:" in question_for_eval:
                question_type = "multi-choice"
                q_parts = question_for_eval.split("Options:", 1)
                question_part = q_parts[0].strip()
                choices_part = q_parts[1].strip() if len(q_parts) > 1 else ""
                
                # 清理问题文本
                question_text = re.sub(r'\s+', ' ', question_part.replace("\n", " ")).strip()
                
                # 自动生成选项ID的计数器
                option_id_counter = 65  # ASCII 'A'
                
                # 分层次解析选项
                for line in re.split(r'[\n;]', choices_part):
                    line = line.strip()
                    if not line:
                        continue

                    # 处理分号分隔的选项(新增逻辑)
                    if re.match(r'^[^:\.]+$', line):  # 没有冒号或点号的情况
                        for sub_opt in re.split(r';\s*', line):
                            sub_opt = sub_opt.strip()
                            if sub_opt:
                                options.append({
                                    "id": chr(option_id_counter),
                                    "text": re.sub(r'\s+', ' ', sub_opt)
                                })
                                option_id_counter += 1
                    else:
                        # 处理标准格式(A: 或 A.)
                        match = re.match(r'^([A-Za-z])[\.:]\s*(.+)$', line)
                        if match:
                            opt_id, opt_text = match.groups()
                            options.append({
                                "id": opt_id.upper(),
                                "text": re.sub(r'\s+', ' ', opt_text.strip())
                            })
                        else:
                            # 保底处理:自动生成ID
                            options.append({
                                "id": chr(option_id_counter),
                                "text": re.sub(r'\s+', ' ', line.strip())
                            })
                            option_id_counter += 1

            else:
                # 自由格式问题处理
                question_text = re.sub(r'\s+', ' ', question_for_eval.replace("\n", " ")).strip()

            # 智能答案匹配系统
            def match_answer(raw_answer, options_list):
                """四层答案匹配机制"""
                raw_answer = str(raw_answer).strip()
                if not raw_answer:
                    return ""

                # 1. 直接ID匹配
                id_map = {opt['id'].upper(): opt['id'] for opt in options_list}
                if raw_answer.upper() in id_map:
                    return id_map[raw_answer.upper()]

                # 2. 精确文本匹配
                text_to_id = {opt['text'].lower(): opt['id'] for opt in options_list}
                if raw_answer.lower() in text_to_id:
                    return text_to_id[raw_answer.lower()]

                # 3. 包含匹配(去除标点)
                clean_answer = re.sub(r'[^\w\s]', '', raw_answer).lower()
                for opt in options_list:
                    clean_text = re.sub(r'[^\w\s]', '', opt['text']).lower()
                    if clean_answer in clean_text:
                        return opt['id']

                # 4. 首字母匹配
                if len(raw_answer) == 1 and raw_answer.isalpha():
                    return raw_answer.upper()

                return raw_answer  # 保底返回原始值

            # 处理答案
            raw_answer = item.get('answer', '')
            processed_answer = match_answer(raw_answer, options) if question_type == "multi-choice" else str(raw_answer)
            answer = [processed_answer.strip().upper() if question_type == "multi-choice" else processed_answer.strip()]

            converted.append({
                "index": index,
                "media_type": "image",
                "media_paths": media_path,
                "description": str(item.get('relation', "")),
                "task_type": cfg['task_type'],
                "question": [question_text],
                "question_type": question_type,
                "options": options,
                "annotations": annotations,
                "answer": answer,
                "source": cfg['source'],
                "domain": cfg['domain']
            })

        with open(output_file, 'w', encoding='utf-8') as f:
            json.dump(converted, f, indent=2, ensure_ascii=False)
        return True

    except Exception as e:
        print(f"转换失败: {input_file}{str(e)}")
        return False

def process_single_file(input_path, output_dir, config):
    """文件处理单元"""
    try:
        output_file = output_dir / input_path.name
        return convert_medical_json(
            input_file=str(input_path),
            output_file=str(output_file),
            config=config
        )
    except Exception as e:
        print(f"文件处理异常: {input_path}{str(e)}")
        return False

def batch_convert_json(input_dir, output_dir, config=None, max_workers=8):
    """并行批量处理器"""
    input_path = Path(input_dir)
    output_path = Path(output_dir)
    output_path.mkdir(parents=True, exist_ok=True)

    success_count = 0
    failure_count = 0

    with ThreadPoolExecutor(max_workers=max_workers) as executor:
        futures = []
        for input_file in input_path.glob('*.json'):
            if input_file.is_file():
                futures.append(executor.submit(
                    process_single_file,
                    input_path=input_file,
                    output_dir=output_path,
                    config=config
                ))

        for future in futures:
            success_count += 1 if future.result() else 0
            failure_count += 0 if future.result() else 1

    print(f"\n处理完成: 成功 {success_count} 个,失败 {failure_count} 个")
    print(f"输出目录: {output_path.resolve()}")

if __name__ == "__main__":
    custom_config = {
        "source": "EmbSpatial",
        "task_type": "Object-Detection",
        "domain": "Embodied_ai"
    }

    try:
        batch_convert_json(
            input_dir="/mnt/data/users/zys/proj/vlm_reasoning/unprocessed_data/emb_ai/EmbSpatial",
            output_dir="/mnt/data/users/zys/proj/vlm_reasoning/dataset",
            config=custom_config,
            max_workers=os.cpu_count() * 2
        )
    except Exception as e:
        print(f"批处理异常: {str(e)}")