tools / utils /json /Lisa.py
Adinosaur's picture
Upload folder using huggingface_hub
1c980b1 verified
import json
import math
from pathlib import Path
import numpy as np
import cv2
from pycocotools import mask as mask_utils
from PIL import Image, ExifTags
def get_image_dimensions(image_path):
"""更健壮的尺寸获取方法,包含多种异常处理"""
try:
# 优先使用PIL获取信息
with Image.open(image_path) as img:
width, height = img.size
orientation = 1
try:
exif = img._getexif() or {}
for tag, name in ExifTags.TAGS.items():
if name == 'Orientation':
orientation = exif.get(tag, 1)
break
except Exception as e:
print(f"EXIF读取警告 [{image_path.name}]: {str(e)}")
# 根据方向调整宽高
if orientation in [5, 6, 7, 8]:
return height, width # 返回交换后的尺寸 (width, height)
else:
return width, height
except Exception as pil_error:
print(f"PIL读取失败 [{image_path.name}], 尝试OpenCV: {str(pil_error)}")
try:
# PIL失败时使用OpenCV
img = cv2.imread(str(image_path))
if img is not None:
h, w = img.shape[:2]
return w, h
raise ValueError("OpenCV返回空图像")
except Exception as cv_error:
print(f"严重错误: 无法获取尺寸 [{image_path.name}]: {str(cv_error)}")
return (0, 0) # 返回无效尺寸,后续会报错但避免崩溃
def points_to_rle(points, img_dimensions):
"""带安全坐标钳位的多边形转换"""
width, height = img_dimensions
mask = np.zeros((height, width), dtype=np.uint8)
polygon = []
for x, y in points:
# 先四舍五入再钳位(根据标注工具特性选择策略)
safe_x = min(max(0, int(round(x))), width - 1)
safe_y = min(max(0, int(round(y))), height - 1)
# 若需要更保守处理(如截断小数部分)
# safe_x = min(max(0, int(math.floor(x))), width - 1)
# safe_y = min(max(0, int(math.floor(y))), height - 1)
polygon.append((safe_x, safe_y))
# 验证多边形有效性
if len(polygon) < 3:
raise ValueError(f"无效多边形,点数不足3个")
# 生成掩码
cv2.fillPoly(mask, [np.array(polygon, dtype=np.int32)], color=1)
rle = mask_utils.encode(np.asfortranarray(mask))
return {
"size": [height, width],
"counts": rle['counts'].decode('utf-8')
}
def convert_medical_json(input_file, config=None):
"""增强版转换函数"""
cfg = {
"task_type": "Image-Segmentation",
"source": "Lisa",
"domain": "General",
**(config or {})
}
try:
input_path = Path(input_file)
image_path = input_path.with_suffix('.jpg')
# 强制校验图片存在性
if not image_path.exists():
raise FileNotFoundError(f"关联图片不存在: {image_path.name}")
media_paths=(Path(".") / "data" / cfg['source'] / image_path.name).as_posix()
media_paths = f"./{media_paths}"
# 获取真实尺寸(已处理EXIF)
width, height = get_image_dimensions(image_path)
if width == 0 or height == 0:
raise ValueError("获取图片尺寸失败")
# 处理标注数据
with open(input_file, 'r', encoding='utf-8') as f:
raw_data = json.load(f)
annotations = []
for shape in raw_data.get('shapes', []):
if shape.get('label') != 'target':
continue
points = shape.get('points', [])
try:
rle = points_to_rle(points, (width, height))
annotations.append({
"bbox": [],
"segmentation": rle,
"category_name": ""
})
except ValueError as e:
print(f"标注跳过 [{input_path.name}]: {str(e)}")
return [{
"index": 0,
"media_type": "image",
"media_paths": media_paths,
"description": "",
"task_type": cfg['task_type'],
"question": raw_data.get('text', []),
"question_type": "detection-form",
"options": [],
"annotations": [annotations],
"answer": [],
"source": cfg['source'],
"domain": cfg['domain']
}]
except Exception as e:
print(f"转换失败 [{input_path.name}]: {str(e)}")
return None
def batch_convert(input_dir, output_file):
"""批量处理增强版"""
input_dir = Path(input_dir)
all_data = []
success_count = 0
failed_files = []
index_counter = 0 # 新增全局索引计数器
for json_file in input_dir.glob('*.json'):
if result := convert_medical_json(json_file):
# 为每个条目分配递增索引
for item in result:
item["index"] = index_counter
index_counter += 1
all_data.extend(result)
success_count += len(result)
else:
failed_files.append(json_file.name)
with open(output_file, 'w', encoding='utf-8') as f:
json.dump(all_data, f, indent=2, ensure_ascii=False)
print(f"转换完成: 成功 {success_count} 个文件,失败 {len(failed_files)} 个")
if failed_files:
print("失败文件列表:\n" + "\n".join(failed_files))
if __name__ == "__main__":
batch_convert(
input_dir="/mnt/data/users/zys/proj/vlm_reasoning/unprocessed_data/general/lisa/image/val",
output_file="/mnt/data/users/zys/proj/vlm_reasoning/utils/json/converted_dataset3.json"
)