|
|
""" |
|
|
Creates a Pytorch dataset to load the Pascal VOC & MS COCO datasets |
|
|
""" |
|
|
|
|
|
import numpy as np |
|
|
import os |
|
|
import pandas as pd |
|
|
import torch |
|
|
import random |
|
|
from PIL import Image, ImageFile |
|
|
|
|
|
import lightning as L |
|
|
from torch.utils.data import Dataset, DataLoader |
|
|
import config as config |
|
|
|
|
|
from utils.utils import xywhn2xyxy, xyxy2xywhn |
|
|
|
|
|
from utils.utils import ( |
|
|
cells_to_bboxes, |
|
|
iou_width_height as iou, |
|
|
non_max_suppression as nms, |
|
|
plot_image, |
|
|
) |
|
|
|
|
|
|
|
|
ImageFile.LOAD_TRUNCATED_IMAGES = True |
|
|
|
|
|
|
|
|
class YOLODataset(Dataset): |
|
|
def __init__( |
|
|
self, |
|
|
csv_file, |
|
|
img_dir, |
|
|
label_dir, |
|
|
anchors, |
|
|
image_size=416, |
|
|
S=[13, 26, 52], |
|
|
C=20, |
|
|
transform=None, |
|
|
): |
|
|
self.annotations = pd.read_csv(csv_file) |
|
|
self.img_dir = img_dir |
|
|
self.label_dir = label_dir |
|
|
self.image_size = image_size |
|
|
self.mosaic_border = [image_size // 2, image_size // 2] |
|
|
self.transform = transform |
|
|
self.S = S |
|
|
self.anchors = torch.tensor( |
|
|
anchors[0] + anchors[1] + anchors[2] |
|
|
) |
|
|
self.num_anchors = self.anchors.shape[0] |
|
|
self.num_anchors_per_scale = self.num_anchors // 3 |
|
|
self.C = C |
|
|
self.ignore_iou_thresh = 0.5 |
|
|
|
|
|
def __len__(self): |
|
|
return len(self.annotations) |
|
|
|
|
|
def load_mosaic(self, index): |
|
|
|
|
|
labels4 = [] |
|
|
s = self.image_size |
|
|
yc, xc = ( |
|
|
int(random.uniform(x, 2 * s - x)) for x in self.mosaic_border |
|
|
) |
|
|
indices = [index] + random.choices( |
|
|
range(len(self)), k=3 |
|
|
) |
|
|
random.shuffle(indices) |
|
|
for i, index in enumerate(indices): |
|
|
|
|
|
label_path = os.path.join(self.label_dir, self.annotations.iloc[index, 1]) |
|
|
bboxes = np.roll( |
|
|
np.loadtxt(fname=label_path, delimiter=" ", ndmin=2), 4, axis=1 |
|
|
).tolist() |
|
|
img_path = os.path.join(self.img_dir, self.annotations.iloc[index, 0]) |
|
|
img = np.array(Image.open(img_path).convert("RGB")) |
|
|
|
|
|
h, w = img.shape[0], img.shape[1] |
|
|
labels = np.array(bboxes) |
|
|
|
|
|
|
|
|
if i == 0: |
|
|
img4 = np.full( |
|
|
(s * 2, s * 2, img.shape[2]), 114, dtype=np.uint8 |
|
|
) |
|
|
x1a, y1a, x2a, y2a = ( |
|
|
max(xc - w, 0), |
|
|
max(yc - h, 0), |
|
|
xc, |
|
|
yc, |
|
|
) |
|
|
x1b, y1b, x2b, y2b = ( |
|
|
w - (x2a - x1a), |
|
|
h - (y2a - y1a), |
|
|
w, |
|
|
h, |
|
|
) |
|
|
elif i == 1: |
|
|
x1a, y1a, x2a, y2a = xc, max(yc - h, 0), min(xc + w, s * 2), yc |
|
|
x1b, y1b, x2b, y2b = 0, h - (y2a - y1a), min(w, x2a - x1a), h |
|
|
elif i == 2: |
|
|
x1a, y1a, x2a, y2a = max(xc - w, 0), yc, xc, min(s * 2, yc + h) |
|
|
x1b, y1b, x2b, y2b = w - (x2a - x1a), 0, w, min(y2a - y1a, h) |
|
|
elif i == 3: |
|
|
x1a, y1a, x2a, y2a = xc, yc, min(xc + w, s * 2), min(s * 2, yc + h) |
|
|
x1b, y1b, x2b, y2b = 0, 0, min(w, x2a - x1a), min(y2a - y1a, h) |
|
|
|
|
|
img4[y1a:y2a, x1a:x2a] = img[y1b:y2b, x1b:x2b] |
|
|
padw = x1a - x1b |
|
|
padh = y1a - y1b |
|
|
|
|
|
|
|
|
if labels.size: |
|
|
labels[:, :-1] = xywhn2xyxy( |
|
|
labels[:, :-1], w, h, padw, padh |
|
|
) |
|
|
labels4.append(labels) |
|
|
|
|
|
|
|
|
labels4 = np.concatenate(labels4, 0) |
|
|
for x in (labels4[:, :-1],): |
|
|
np.clip(x, 0, 2 * s, out=x) |
|
|
|
|
|
labels4[:, :-1] = xyxy2xywhn(labels4[:, :-1], 2 * s, 2 * s) |
|
|
labels4[:, :-1] = np.clip(labels4[:, :-1], 0, 1) |
|
|
labels4 = labels4[labels4[:, 2] > 0] |
|
|
labels4 = labels4[labels4[:, 3] > 0] |
|
|
return img4, labels4 |
|
|
|
|
|
def __getitem__(self, index): |
|
|
if random.random() >= config.P_MOSAIC: |
|
|
image, bboxes = self.load_mosaic(index) |
|
|
else: |
|
|
label_path = os.path.join(self.label_dir, self.annotations.iloc[index, 1]) |
|
|
bboxes = np.roll( |
|
|
np.loadtxt(fname=label_path, delimiter=" ", ndmin=2), 4, axis=1 |
|
|
).tolist() |
|
|
img_path = os.path.join(self.img_dir, self.annotations.iloc[index, 0]) |
|
|
image = np.array(Image.open(img_path).convert("RGB")) |
|
|
|
|
|
if self.transform: |
|
|
augmentations = self.transform(image=image, bboxes=bboxes) |
|
|
image = augmentations["image"] |
|
|
bboxes = augmentations["bboxes"] |
|
|
|
|
|
|
|
|
targets = [torch.zeros((self.num_anchors // 3, S, S, 6)) for S in self.S] |
|
|
for box in bboxes: |
|
|
iou_anchors = iou(torch.tensor(box[2:4]), self.anchors) |
|
|
anchor_indices = iou_anchors.argsort(descending=True, dim=0) |
|
|
x, y, width, height, class_label = box |
|
|
has_anchor = [False] * 3 |
|
|
for anchor_idx in anchor_indices: |
|
|
scale_idx = anchor_idx // self.num_anchors_per_scale |
|
|
anchor_on_scale = anchor_idx % self.num_anchors_per_scale |
|
|
S = self.S[scale_idx] |
|
|
i, j = int(S * y), int(S * x) |
|
|
anchor_taken = targets[scale_idx][anchor_on_scale, i, j, 0] |
|
|
if not anchor_taken and not has_anchor[scale_idx]: |
|
|
targets[scale_idx][anchor_on_scale, i, j, 0] = 1 |
|
|
x_cell, y_cell = S * x - j, S * y - i |
|
|
width_cell, height_cell = ( |
|
|
width * S, |
|
|
height * S, |
|
|
) |
|
|
box_coordinates = torch.tensor( |
|
|
[x_cell, y_cell, width_cell, height_cell] |
|
|
) |
|
|
targets[scale_idx][anchor_on_scale, i, j, 1:5] = box_coordinates |
|
|
targets[scale_idx][anchor_on_scale, i, j, 5] = int(class_label) |
|
|
has_anchor[scale_idx] = True |
|
|
|
|
|
elif ( |
|
|
not anchor_taken |
|
|
and iou_anchors[anchor_idx] > self.ignore_iou_thresh |
|
|
): |
|
|
targets[scale_idx][ |
|
|
anchor_on_scale, i, j, 0 |
|
|
] = -1 |
|
|
|
|
|
return image, tuple(targets) |
|
|
|
|
|
|
|
|
def test(): |
|
|
anchors = config.ANCHORS |
|
|
|
|
|
transform = config.test_transforms |
|
|
|
|
|
dataset = YOLODataset( |
|
|
"COCO/train.csv", |
|
|
"COCO/images/images/", |
|
|
"COCO/labels/labels_new/", |
|
|
S=[13, 26, 52], |
|
|
anchors=anchors, |
|
|
transform=transform, |
|
|
) |
|
|
S = [13, 26, 52] |
|
|
scaled_anchors = torch.tensor(anchors) / ( |
|
|
1 / torch.tensor(S).unsqueeze(1).unsqueeze(1).repeat(1, 3, 2) |
|
|
) |
|
|
loader = DataLoader(dataset=dataset, batch_size=1, shuffle=True) |
|
|
for x, y in loader: |
|
|
boxes = [] |
|
|
|
|
|
for i in range(y[0].shape[1]): |
|
|
anchor = scaled_anchors[i] |
|
|
print(anchor.shape) |
|
|
print(y[i].shape) |
|
|
boxes += cells_to_bboxes( |
|
|
y[i], is_preds=False, S=y[i].shape[2], anchors=anchor |
|
|
)[0] |
|
|
boxes = nms(boxes, iou_threshold=1, threshold=0.7, box_format="midpoint") |
|
|
print(boxes) |
|
|
plot_image(x[0].permute(1, 2, 0).to("cpu"), boxes) |
|
|
|
|
|
|
|
|
class PascalDataModule(L.LightningDataModule): |
|
|
def __init__( |
|
|
self, |
|
|
train_csv_path=None, |
|
|
test_csv_path=None, |
|
|
batch_size=512, |
|
|
shuffle=True, |
|
|
num_workers=4, |
|
|
) -> None: |
|
|
super().__init__() |
|
|
self.train_csv_path = train_csv_path |
|
|
self.test_csv_path = test_csv_path |
|
|
self.batch_size = batch_size |
|
|
self.shuffle = shuffle |
|
|
self.num_workers = num_workers |
|
|
self.IMAGE_SIZE = config.IMAGE_SIZE |
|
|
|
|
|
def prepare_data(self) -> None: |
|
|
pass |
|
|
|
|
|
def setup(self, stage=None): |
|
|
self.train_dataset = YOLODataset( |
|
|
self.train_csv_path, |
|
|
transform=config.train_transforms, |
|
|
S=[self.IMAGE_SIZE // 32, self.IMAGE_SIZE // 16, self.IMAGE_SIZE // 8], |
|
|
img_dir=config.IMG_DIR, |
|
|
label_dir=config.LABEL_DIR, |
|
|
anchors=config.ANCHORS, |
|
|
) |
|
|
|
|
|
self.val_dataset = YOLODataset( |
|
|
self.test_csv_path, |
|
|
transform=config.test_transforms, |
|
|
S=[self.IMAGE_SIZE // 32, self.IMAGE_SIZE // 16, self.IMAGE_SIZE // 8], |
|
|
img_dir=config.IMG_DIR, |
|
|
label_dir=config.LABEL_DIR, |
|
|
anchors=config.ANCHORS, |
|
|
) |
|
|
|
|
|
self.test_dataset = YOLODataset( |
|
|
self.test_csv_path, |
|
|
transform=config.test_transforms, |
|
|
S=[self.IMAGE_SIZE // 32, self.IMAGE_SIZE // 16, self.IMAGE_SIZE // 8], |
|
|
img_dir=config.IMG_DIR, |
|
|
label_dir=config.LABEL_DIR, |
|
|
anchors=config.ANCHORS, |
|
|
) |
|
|
|
|
|
def train_dataloader(self): |
|
|
return DataLoader( |
|
|
dataset=self.train_dataset, |
|
|
batch_size=config.BATCH_SIZE, |
|
|
num_workers=config.NUM_WORKERS, |
|
|
pin_memory=config.PIN_MEMORY, |
|
|
shuffle=True, |
|
|
drop_last=False, |
|
|
) |
|
|
|
|
|
def val_dataloader(self): |
|
|
return DataLoader( |
|
|
dataset=self.val_dataset, |
|
|
batch_size=config.BATCH_SIZE, |
|
|
num_workers=config.NUM_WORKERS, |
|
|
pin_memory=config.PIN_MEMORY, |
|
|
shuffle=False, |
|
|
drop_last=False, |
|
|
) |
|
|
|
|
|
def test_dataloader(self): |
|
|
return DataLoader( |
|
|
dataset=self.test_dataset, |
|
|
batch_size=config.BATCH_SIZE, |
|
|
num_workers=config.NUM_WORKERS, |
|
|
pin_memory=config.PIN_MEMORY, |
|
|
shuffle=False, |
|
|
drop_last=False, |
|
|
) |
|
|
|