import json import math from pathlib import Path import numpy as np import cv2 from pycocotools import mask as mask_utils from PIL import Image, ExifTags def get_image_dimensions(image_path): """更健壮的尺寸获取方法,包含多种异常处理""" try: # 优先使用PIL获取信息 with Image.open(image_path) as img: width, height = img.size orientation = 1 try: exif = img._getexif() or {} for tag, name in ExifTags.TAGS.items(): if name == 'Orientation': orientation = exif.get(tag, 1) break except Exception as e: print(f"EXIF读取警告 [{image_path.name}]: {str(e)}") # 根据方向调整宽高 if orientation in [5, 6, 7, 8]: return height, width # 返回交换后的尺寸 (width, height) else: return width, height except Exception as pil_error: print(f"PIL读取失败 [{image_path.name}], 尝试OpenCV: {str(pil_error)}") try: # PIL失败时使用OpenCV img = cv2.imread(str(image_path)) if img is not None: h, w = img.shape[:2] return w, h raise ValueError("OpenCV返回空图像") except Exception as cv_error: print(f"严重错误: 无法获取尺寸 [{image_path.name}]: {str(cv_error)}") return (0, 0) # 返回无效尺寸,后续会报错但避免崩溃 def points_to_rle(points, img_dimensions): """带安全坐标钳位的多边形转换""" width, height = img_dimensions mask = np.zeros((height, width), dtype=np.uint8) polygon = [] for x, y in points: # 先四舍五入再钳位(根据标注工具特性选择策略) safe_x = min(max(0, int(round(x))), width - 1) safe_y = min(max(0, int(round(y))), height - 1) # 若需要更保守处理(如截断小数部分) # safe_x = min(max(0, int(math.floor(x))), width - 1) # safe_y = min(max(0, int(math.floor(y))), height - 1) polygon.append((safe_x, safe_y)) # 验证多边形有效性 if len(polygon) < 3: raise ValueError(f"无效多边形,点数不足3个") # 生成掩码 cv2.fillPoly(mask, [np.array(polygon, dtype=np.int32)], color=1) rle = mask_utils.encode(np.asfortranarray(mask)) return { "size": [height, width], "counts": rle['counts'].decode('utf-8') } def convert_medical_json(input_file, config=None): """增强版转换函数""" cfg = { "task_type": "Image-Segmentation", "source": "Lisa", "domain": "General", **(config or {}) } try: input_path = Path(input_file) image_path = input_path.with_suffix('.jpg') # 强制校验图片存在性 if not image_path.exists(): raise FileNotFoundError(f"关联图片不存在: {image_path.name}") media_paths=(Path(".") / "data" / cfg['source'] / image_path.name).as_posix() media_paths = f"./{media_paths}" # 获取真实尺寸(已处理EXIF) width, height = get_image_dimensions(image_path) if width == 0 or height == 0: raise ValueError("获取图片尺寸失败") # 处理标注数据 with open(input_file, 'r', encoding='utf-8') as f: raw_data = json.load(f) annotations = [] for shape in raw_data.get('shapes', []): if shape.get('label') != 'target': continue points = shape.get('points', []) try: rle = points_to_rle(points, (width, height)) annotations.append({ "bbox": [], "segmentation": rle, "category_name": "" }) except ValueError as e: print(f"标注跳过 [{input_path.name}]: {str(e)}") return [{ "index": 0, "media_type": "image", "media_paths": media_paths, "description": "", "task_type": cfg['task_type'], "question": raw_data.get('text', []), "question_type": "detection-form", "options": [], "annotations": [annotations], "answer": [], "source": cfg['source'], "domain": cfg['domain'] }] except Exception as e: print(f"转换失败 [{input_path.name}]: {str(e)}") return None def batch_convert(input_dir, output_file): """批量处理增强版""" input_dir = Path(input_dir) all_data = [] success_count = 0 failed_files = [] index_counter = 0 # 新增全局索引计数器 for json_file in input_dir.glob('*.json'): if result := convert_medical_json(json_file): # 为每个条目分配递增索引 for item in result: item["index"] = index_counter index_counter += 1 all_data.extend(result) success_count += len(result) else: failed_files.append(json_file.name) with open(output_file, 'w', encoding='utf-8') as f: json.dump(all_data, f, indent=2, ensure_ascii=False) print(f"转换完成: 成功 {success_count} 个文件,失败 {len(failed_files)} 个") if failed_files: print("失败文件列表:\n" + "\n".join(failed_files)) if __name__ == "__main__": batch_convert( input_dir="/mnt/data/users/zys/proj/vlm_reasoning/unprocessed_data/general/lisa/image/val", output_file="/mnt/data/users/zys/proj/vlm_reasoning/utils/json/converted_dataset3.json" )