File size: 5,886 Bytes
1c980b1 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 |
import json
import math
from pathlib import Path
import numpy as np
import cv2
from pycocotools import mask as mask_utils
from PIL import Image, ExifTags
def get_image_dimensions(image_path):
"""更健壮的尺寸获取方法,包含多种异常处理"""
try:
# 优先使用PIL获取信息
with Image.open(image_path) as img:
width, height = img.size
orientation = 1
try:
exif = img._getexif() or {}
for tag, name in ExifTags.TAGS.items():
if name == 'Orientation':
orientation = exif.get(tag, 1)
break
except Exception as e:
print(f"EXIF读取警告 [{image_path.name}]: {str(e)}")
# 根据方向调整宽高
if orientation in [5, 6, 7, 8]:
return height, width # 返回交换后的尺寸 (width, height)
else:
return width, height
except Exception as pil_error:
print(f"PIL读取失败 [{image_path.name}], 尝试OpenCV: {str(pil_error)}")
try:
# PIL失败时使用OpenCV
img = cv2.imread(str(image_path))
if img is not None:
h, w = img.shape[:2]
return w, h
raise ValueError("OpenCV返回空图像")
except Exception as cv_error:
print(f"严重错误: 无法获取尺寸 [{image_path.name}]: {str(cv_error)}")
return (0, 0) # 返回无效尺寸,后续会报错但避免崩溃
def points_to_rle(points, img_dimensions):
"""带安全坐标钳位的多边形转换"""
width, height = img_dimensions
mask = np.zeros((height, width), dtype=np.uint8)
polygon = []
for x, y in points:
# 先四舍五入再钳位(根据标注工具特性选择策略)
safe_x = min(max(0, int(round(x))), width - 1)
safe_y = min(max(0, int(round(y))), height - 1)
# 若需要更保守处理(如截断小数部分)
# safe_x = min(max(0, int(math.floor(x))), width - 1)
# safe_y = min(max(0, int(math.floor(y))), height - 1)
polygon.append((safe_x, safe_y))
# 验证多边形有效性
if len(polygon) < 3:
raise ValueError(f"无效多边形,点数不足3个")
# 生成掩码
cv2.fillPoly(mask, [np.array(polygon, dtype=np.int32)], color=1)
rle = mask_utils.encode(np.asfortranarray(mask))
return {
"size": [height, width],
"counts": rle['counts'].decode('utf-8')
}
def convert_medical_json(input_file, config=None):
"""增强版转换函数"""
cfg = {
"task_type": "Image-Segmentation",
"source": "Lisa",
"domain": "General",
**(config or {})
}
try:
input_path = Path(input_file)
image_path = input_path.with_suffix('.jpg')
# 强制校验图片存在性
if not image_path.exists():
raise FileNotFoundError(f"关联图片不存在: {image_path.name}")
media_paths=(Path(".") / "data" / cfg['source'] / image_path.name).as_posix()
media_paths = f"./{media_paths}"
# 获取真实尺寸(已处理EXIF)
width, height = get_image_dimensions(image_path)
if width == 0 or height == 0:
raise ValueError("获取图片尺寸失败")
# 处理标注数据
with open(input_file, 'r', encoding='utf-8') as f:
raw_data = json.load(f)
annotations = []
for shape in raw_data.get('shapes', []):
if shape.get('label') != 'target':
continue
points = shape.get('points', [])
try:
rle = points_to_rle(points, (width, height))
annotations.append({
"bbox": [],
"segmentation": rle,
"category_name": ""
})
except ValueError as e:
print(f"标注跳过 [{input_path.name}]: {str(e)}")
return [{
"index": 0,
"media_type": "image",
"media_paths": media_paths,
"description": "",
"task_type": cfg['task_type'],
"question": raw_data.get('text', []),
"question_type": "detection-form",
"options": [],
"annotations": [annotations],
"answer": [],
"source": cfg['source'],
"domain": cfg['domain']
}]
except Exception as e:
print(f"转换失败 [{input_path.name}]: {str(e)}")
return None
def batch_convert(input_dir, output_file):
"""批量处理增强版"""
input_dir = Path(input_dir)
all_data = []
success_count = 0
failed_files = []
index_counter = 0 # 新增全局索引计数器
for json_file in input_dir.glob('*.json'):
if result := convert_medical_json(json_file):
# 为每个条目分配递增索引
for item in result:
item["index"] = index_counter
index_counter += 1
all_data.extend(result)
success_count += len(result)
else:
failed_files.append(json_file.name)
with open(output_file, 'w', encoding='utf-8') as f:
json.dump(all_data, f, indent=2, ensure_ascii=False)
print(f"转换完成: 成功 {success_count} 个文件,失败 {len(failed_files)} 个")
if failed_files:
print("失败文件列表:\n" + "\n".join(failed_files))
if __name__ == "__main__":
batch_convert(
input_dir="/mnt/data/users/zys/proj/vlm_reasoning/unprocessed_data/general/lisa/image/val",
output_file="/mnt/data/users/zys/proj/vlm_reasoning/utils/json/converted_dataset3.json"
) |