Spaces:
Sleeping
Sleeping
| import json | |
| import math | |
| from dataclasses import dataclass, field | |
| from pathlib import Path | |
| import numpy as np | |
| import tensorflow as tf | |
| class CocoYoloDataset: | |
| images_path: str | |
| annotation_file: str | |
| grid_size: int = 6 | |
| max_objects_per_image: int = 4 | |
| _images_path: Path = field(init=False, repr=False) | |
| _annotation_file: Path = field(init=False, repr=False) | |
| _samples: list[dict] = field(init=False, repr=False, default_factory=list) | |
| def __post_init__(self): | |
| self._images_path = Path(self.images_path).expanduser() | |
| self._annotation_file = Path(self.annotation_file).expanduser() | |
| def sample_ds(self) -> tf.data.Dataset: | |
| samples = self._load_samples() | |
| return tf.data.Dataset.from_generator( | |
| lambda: ( | |
| sample | |
| for sample in samples | |
| ), | |
| output_signature={ | |
| "path": tf.TensorSpec(shape=(), dtype=tf.string), | |
| "boxes": tf.TensorSpec(shape=(None, 4), dtype=tf.float32), | |
| "labels": tf.TensorSpec(shape=(None,), dtype=tf.int32) | |
| } | |
| ) | |
| def training_ds(self, batch_size: int, preprocessor) -> tf.data.Dataset: | |
| dataset = self.sample_ds() | |
| dataset = dataset.filter( | |
| lambda sample: tf.shape(sample["boxes"])[0] <= self.max_objects_per_image | |
| ) | |
| dataset = dataset.map( | |
| lambda sample: self._build_training_sample(sample, preprocessor), | |
| num_parallel_calls=tf.data.AUTOTUNE | |
| ) | |
| dataset = dataset.batch(batch_size).prefetch(tf.data.AUTOTUNE) | |
| return dataset | |
| def _build_training_sample(self, sample: dict, preprocessor) -> tuple[tf.Tensor, dict]: | |
| image = self._load_image(sample["path"], preprocessor) | |
| box_array, class_array = tf.numpy_function( | |
| self._build_label_arrays_numpy, | |
| [sample["boxes"], sample["labels"]], | |
| [tf.float32, tf.int32] | |
| ) | |
| box_array.set_shape((self.grid_size, self.grid_size, 5)) | |
| class_array.set_shape((self.grid_size, self.grid_size)) | |
| return image, {"box": box_array, "class": class_array} | |
| def _load_samples(self) -> list[dict]: | |
| if self._samples: | |
| return self._samples | |
| with self._annotation_file.open("r", encoding="utf-8") as file: | |
| annotations = json.load(file) | |
| images = {image["id"]: image for image in annotations["images"]} | |
| metadata = {} | |
| for annotation in annotations["annotations"]: | |
| image_id = annotation["image_id"] | |
| if image_id not in metadata: | |
| image = images[image_id] | |
| metadata[image_id] = { | |
| "path": str(self._images_path / image["file_name"]), | |
| "boxes": [], | |
| "labels": [] | |
| } | |
| image = images[image_id] | |
| box = self.scale_box(annotation["bbox"], image["width"], image["height"]) | |
| metadata[image_id]["boxes"].append(box) | |
| metadata[image_id]["labels"].append(annotation["category_id"]) | |
| self._samples = [ | |
| { | |
| "path": sample["path"], | |
| "boxes": np.asarray(sample["boxes"], dtype="float32"), | |
| "labels": np.asarray(sample["labels"], dtype="int32") | |
| } | |
| for sample in metadata.values() | |
| ] | |
| return self._samples | |
| def scale_box(self, box: list[float], width: int, height: int) -> list[float]: | |
| scale = 1.0 / max(width, height) | |
| x, y, w, h = [value * scale for value in box] | |
| if height > width: | |
| x += (height - width) * scale / 2 | |
| if width > height: | |
| y += (width - height) * scale / 2 | |
| return [x, y, w, h] | |
| def to_grid(self, box: list[float]) -> tuple[tuple[int, int], tuple[float, float, float, float]]: | |
| x, y, w, h = box | |
| center_x = (x + w / 2) * self.grid_size | |
| center_y = (y + h / 2) * self.grid_size | |
| index_x = int(center_x) | |
| index_y = int(center_y) | |
| return (index_x, index_y), (center_x - index_x, center_y - index_y, w, h) | |
| def _build_label_arrays_numpy(self, boxes: np.ndarray, labels: np.ndarray) -> tuple[np.ndarray, np.ndarray]: | |
| box_array = np.zeros((self.grid_size, self.grid_size, 5), dtype="float32") | |
| class_array = np.zeros((self.grid_size, self.grid_size), dtype="int32") | |
| for box, label in zip(boxes, labels): | |
| x, y, w, h = box | |
| left = max(math.floor(x * self.grid_size), 0) | |
| right = min(math.ceil((x + w) * self.grid_size), self.grid_size) | |
| bottom = max(math.floor(y * self.grid_size), 0) | |
| top = min(math.ceil((y + h) * self.grid_size), self.grid_size) | |
| class_array[bottom:top, left:right] = label | |
| for box, label in zip(boxes, labels): | |
| (index_x, index_y), grid_box = self.to_grid(box.tolist()) | |
| index_x = min(max(index_x, 0), self.grid_size - 1) | |
| index_y = min(max(index_y, 0), self.grid_size - 1) | |
| box_array[index_y, index_x] = [*grid_box, 1.0] | |
| class_array[index_y, index_x] = label | |
| return box_array, class_array | |
| def _load_image(path: tf.Tensor, preprocessor) -> tf.Tensor: | |
| image = tf.io.read_file(path) | |
| image = tf.image.decode_jpeg(image, channels=3) | |
| image = tf.expand_dims(image, axis=0) | |
| image = preprocessor(image) | |
| image = tf.squeeze(image, axis=0) | |
| return tf.cast(image, tf.float32) | |