yetrun's picture
ver2: 扩展 CV 训练框架,支持分类、分割与目标检测任务
14f6839
Raw
History Blame Contribute Delete
5.57 kB
import json
import math
from dataclasses import dataclass, field
from pathlib import Path
import numpy as np
import tensorflow as tf
@dataclass
class CocoYoloDataset:
images_path: str
annotation_file: str
grid_size: int = 6
max_objects_per_image: int = 4
_images_path: Path = field(init=False, repr=False)
_annotation_file: Path = field(init=False, repr=False)
_samples: list[dict] = field(init=False, repr=False, default_factory=list)
def __post_init__(self):
self._images_path = Path(self.images_path).expanduser()
self._annotation_file = Path(self.annotation_file).expanduser()
def sample_ds(self) -> tf.data.Dataset:
samples = self._load_samples()
return tf.data.Dataset.from_generator(
lambda: (
sample
for sample in samples
),
output_signature={
"path": tf.TensorSpec(shape=(), dtype=tf.string),
"boxes": tf.TensorSpec(shape=(None, 4), dtype=tf.float32),
"labels": tf.TensorSpec(shape=(None,), dtype=tf.int32)
}
)
def training_ds(self, batch_size: int, preprocessor) -> tf.data.Dataset:
dataset = self.sample_ds()
dataset = dataset.filter(
lambda sample: tf.shape(sample["boxes"])[0] <= self.max_objects_per_image
)
dataset = dataset.map(
lambda sample: self._build_training_sample(sample, preprocessor),
num_parallel_calls=tf.data.AUTOTUNE
)
dataset = dataset.batch(batch_size).prefetch(tf.data.AUTOTUNE)
return dataset
def _build_training_sample(self, sample: dict, preprocessor) -> tuple[tf.Tensor, dict]:
image = self._load_image(sample["path"], preprocessor)
box_array, class_array = tf.numpy_function(
self._build_label_arrays_numpy,
[sample["boxes"], sample["labels"]],
[tf.float32, tf.int32]
)
box_array.set_shape((self.grid_size, self.grid_size, 5))
class_array.set_shape((self.grid_size, self.grid_size))
return image, {"box": box_array, "class": class_array}
def _load_samples(self) -> list[dict]:
if self._samples:
return self._samples
with self._annotation_file.open("r", encoding="utf-8") as file:
annotations = json.load(file)
images = {image["id"]: image for image in annotations["images"]}
metadata = {}
for annotation in annotations["annotations"]:
image_id = annotation["image_id"]
if image_id not in metadata:
image = images[image_id]
metadata[image_id] = {
"path": str(self._images_path / image["file_name"]),
"boxes": [],
"labels": []
}
image = images[image_id]
box = self.scale_box(annotation["bbox"], image["width"], image["height"])
metadata[image_id]["boxes"].append(box)
metadata[image_id]["labels"].append(annotation["category_id"])
self._samples = [
{
"path": sample["path"],
"boxes": np.asarray(sample["boxes"], dtype="float32"),
"labels": np.asarray(sample["labels"], dtype="int32")
}
for sample in metadata.values()
]
return self._samples
def scale_box(self, box: list[float], width: int, height: int) -> list[float]:
scale = 1.0 / max(width, height)
x, y, w, h = [value * scale for value in box]
if height > width:
x += (height - width) * scale / 2
if width > height:
y += (width - height) * scale / 2
return [x, y, w, h]
def to_grid(self, box: list[float]) -> tuple[tuple[int, int], tuple[float, float, float, float]]:
x, y, w, h = box
center_x = (x + w / 2) * self.grid_size
center_y = (y + h / 2) * self.grid_size
index_x = int(center_x)
index_y = int(center_y)
return (index_x, index_y), (center_x - index_x, center_y - index_y, w, h)
def _build_label_arrays_numpy(self, boxes: np.ndarray, labels: np.ndarray) -> tuple[np.ndarray, np.ndarray]:
box_array = np.zeros((self.grid_size, self.grid_size, 5), dtype="float32")
class_array = np.zeros((self.grid_size, self.grid_size), dtype="int32")
for box, label in zip(boxes, labels):
x, y, w, h = box
left = max(math.floor(x * self.grid_size), 0)
right = min(math.ceil((x + w) * self.grid_size), self.grid_size)
bottom = max(math.floor(y * self.grid_size), 0)
top = min(math.ceil((y + h) * self.grid_size), self.grid_size)
class_array[bottom:top, left:right] = label
for box, label in zip(boxes, labels):
(index_x, index_y), grid_box = self.to_grid(box.tolist())
index_x = min(max(index_x, 0), self.grid_size - 1)
index_y = min(max(index_y, 0), self.grid_size - 1)
box_array[index_y, index_x] = [*grid_box, 1.0]
class_array[index_y, index_x] = label
return box_array, class_array
@staticmethod
def _load_image(path: tf.Tensor, preprocessor) -> tf.Tensor:
image = tf.io.read_file(path)
image = tf.image.decode_jpeg(image, channels=3)
image = tf.expand_dims(image, axis=0)
image = preprocessor(image)
image = tf.squeeze(image, axis=0)
return tf.cast(image, tf.float32)