GECO2-demo / utils /data.py
jerpelhan's picture
Initial commit
6146368
raw
history blame
32.9 kB
import argparse
import json
import os
import numpy as np
import torch
from PIL import Image
from pycocotools.coco import COCO
from scipy.ndimage import gaussian_filter
from torch.utils.data import Dataset
from torchvision import transforms as T
from torchvision.ops import box_convert
from torchvision.transforms import functional as TVF
from tqdm import tqdm
from torch.nn.utils.rnn import pad_sequence
def tiling_augmentation(img, bboxes, resize, jitter, tile_size, hflip_p, gt_bboxes=None, density_map=None):
def apply_hflip(tensor, apply):
return TVF.hflip(tensor) if apply else tensor
def make_tile(x, num_tiles, jitter=None):
result = list()
for j in range(num_tiles):
row = list()
for k in range(num_tiles):
t = jitter(x) if jitter is not None else x
row.append(t)
result.append(torch.cat(row, dim=-1))
return torch.cat(result, dim=-2)
x_tile, y_tile = tile_size
y_target, x_target = resize.size
num_tiles = max(int(x_tile.ceil()), int(y_tile.ceil()))
img = make_tile(img, num_tiles, jitter=jitter)
c, h, w = img.shape
img = resize(img)
if density_map is not None:
density_map = make_tile(density_map, num_tiles, jitter=jitter)
density_map = density_map
original_sum = density_map.sum()
density_map = resize(density_map)
density_map = density_map / density_map.sum() * original_sum
bboxes = bboxes / torch.tensor([w, h, w, h]) * resize.size[0]
if gt_bboxes is not None:
gt_bboxes_ = gt_bboxes / torch.tensor([w, h, w, h]) * resize.size[0]
gt_bboxes_tiled = torch.cat([gt_bboxes_,
gt_bboxes_ + torch.tensor([0, y_target // 2, 0, y_target // 2]),
gt_bboxes_ + torch.tensor([x_target // 2, 0, x_target // 2, 0]),
gt_bboxes_ + torch.tensor(
[x_target // 2, y_target // 2, x_target // 2, y_target // 2])])
return img, bboxes, density_map, gt_bboxes_tiled
return img, bboxes, density_map
def xywh_to_x1y1x2y2(xywh):
x, y, w, h = xywh
x1 = x
y1 = y
x2 = x + w
y2 = y + h
return [x1, y1, x2, y2]
def pad_collate(batch):
(img, bboxes, density_map, image_names, gt_bboxes) = zip(*batch)
gt_bboxes_pad = pad_sequence(gt_bboxes, batch_first=True, padding_value=0)
img = torch.stack(img)
bboxes = torch.stack(bboxes)
image_names = torch.stack(image_names)
gt_bboxes = gt_bboxes_pad
density_map = torch.stack(density_map)
return img, bboxes, density_map, image_names, gt_bboxes
def pad_collate_test(batch):
(img, bboxes, density_map, ids, gt_bboxes, scaling_factor, padwh) = zip(*batch)
gt_bboxes_pad = pad_sequence(gt_bboxes, batch_first=True, padding_value=0)
img = torch.stack(img)
bboxes = torch.stack(bboxes)
density_map = torch.stack(density_map)
ids = torch.stack(ids)
scaling_factor = torch.tensor(scaling_factor)
padwh = torch.tensor(padwh)
return img, bboxes, density_map, ids, gt_bboxes_pad, scaling_factor, padwh
class FSC147DATASET(Dataset):
def __init__(
self, data_path, img_size, split='train', num_objects=3,
tiling_p=0.5, zero_shot=False, return_ids=False, training=False
):
self.split = split
self.data_path = data_path
self.horizontal_flip_p = 0.5
self.tiling_p = tiling_p
self.img_size = img_size
self.resize = T.Resize((img_size, img_size), antialias=True)
self.resize512 = T.Resize((512, 512), antialias=True)
self.jitter = T.RandomApply([T.ColorJitter(0.4, 0.4, 0.4, 0.1)], p=0.8)
self.num_objects = num_objects
self.zero_shot = zero_shot
self.return_ids = return_ids
self.training = training
with open(
os.path.join(self.data_path, 'annotations', 'Train_Test_Val_FSC_147.json'), 'rb'
) as file:
splits = json.load(file)
self.image_names = splits[split]
with open(
os.path.join(self.data_path, 'annotations', 'annotation_FSC147_384.json'), 'rb'
) as file:
self.annotations = json.load(file)
self.labels = COCO(os.path.join(self.data_path, 'annotations', 'instances_' + split + '.json'))
self.img_name_to_ori_id = self.map_img_name_to_ori_id()
def get_gt_bboxes(self, idx):
coco_im_id = self.img_name_to_ori_id[self.image_names[idx]]
anno_ids = self.labels.getAnnIds([coco_im_id])
annotations = self.labels.loadAnns(anno_ids)
bboxes = []
for a in annotations:
bboxes.append(xywh_to_x1y1x2y2(a['bbox']))
return bboxes
def __getitem__(self, idx: int):
img = Image.open(os.path.join(
self.data_path,
'images_384_VarV2',
self.image_names[idx]
)).convert("RGB")
w, h = img.size
gt_bboxes = torch.tensor(self.get_gt_bboxes(idx))
# fig, ax = plt.subplots(1)
# # Display the image
# ax.imshow(img)
# # Plot each bounding box
# for bbox in gt_bboxes:
# x, y, width, height = bbox
# rect = patches.Rectangle(
# (x, y), width - x, height - y,
# linewidth=0.8, edgecolor='r', facecolor='none'
# )
# ax.add_patch(rect)
#
# plt.savefig(os.path.join("/storage/datasets/fsc147/plot/",self.image_names[idx]))
# plt.close()
img = T.Compose([
T.ToTensor(),
])(img)
bboxes = torch.tensor(
self.annotations[self.image_names[idx]]['box_examples_coordinates'],
dtype=torch.float32
)[:3, [0, 2], :].reshape(-1, 4)[:self.num_objects, ...]
# take the bbox with largest area bboxes are in xyxy format
# width = bboxes[:, 2] - bboxes[:, 0]
# height = bboxes[:, 3] - bboxes[:, 1]
# area = width * height
# bboxes = bboxes[area.argsort()]
# bboxes = bboxes[0].unsqueeze(0)
density_map = torch.from_numpy(np.load(os.path.join(
self.data_path,
'gt_density_map_adaptive_512_512_object_VarV2',
# 'gt_density_map_adaptive_1024_1024_SAME',
os.path.splitext(self.image_names[idx])[0] + '.npy',
))).unsqueeze(0)
if self.split == 'train':
tiled = False
# data augmentation
# if mean of bbox width and height is under a predefined threshold
channels, original_height, original_width = img.shape
longer_dimension = max(original_height, original_width)
scaling_factor = self.img_size / longer_dimension
bboxes_resized = bboxes * torch.tensor([scaling_factor, scaling_factor, scaling_factor, scaling_factor])
if (bboxes_resized[:, 2] - bboxes_resized[:, 0]).mean() > 30 and (
bboxes_resized[:, 3] - bboxes_resized[:, 1]).mean() > 30 and torch.rand(1) < self.tiling_p:
tiled = True
tile_size = (torch.rand(1) + 1, torch.rand(1) + 1)
img, bboxes, density_map, gt_bboxes = tiling_augmentation(
img, bboxes, self.resize,
self.jitter, tile_size, self.horizontal_flip_p, gt_bboxes=gt_bboxes, density_map=density_map
)
else:
img = self.jitter(img)
img, bboxes, density_map, gt_bboxes, scaling_factor, padwh = resize_and_pad(img, bboxes, density_map,
gt_bboxes=gt_bboxes,
train=True)
if not tiled and torch.rand(1) < self.horizontal_flip_p:
img = TVF.hflip(img)
density_map = TVF.hflip(density_map)
bboxes[:, [0, 2]] = self.img_size - bboxes[:, [2, 0]]
gt_bboxes[:, [0, 2]] = self.img_size - gt_bboxes[:, [2, 0]]
else:
# if bboxes (xyxy) are in average > 50 px call this
# width = bboxes[:, 2] - bboxes[:, 0]
# height = bboxes[:, 3] - bboxes[:, 1]
# if width.mean()>50 and height.mean()>50:
img, bboxes, density_map, gt_bboxes, scaling_factor, padwh = tile_multiscale(img, bboxes, density_map,
gt_bboxes=gt_bboxes)
# else:
# return 1, 1, 1, 1, 1, 1, 1
original_sum = density_map.sum()
density_map = self.resize512(density_map)
density_map = density_map / density_map.sum() * original_sum
gt_bboxes = torch.clamp(gt_bboxes, min=0, max=1024)
img = T.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])(img)
# if self.split == 'train' or self.training:
# return img, bboxes, density_map, torch.tensor(idx), gt_bboxes
# else:
return img, bboxes, density_map, torch.tensor(idx), gt_bboxes, torch.tensor(scaling_factor), padwh
def __len__(self):
return len(self.image_names)
def map_img_name_to_ori_id(self, ):
all_coco_imgs = self.labels.imgs
map_name_2_id = dict()
for k, v in all_coco_imgs.items():
img_id = v["id"]
img_name = v["file_name"]
map_name_2_id[img_name] = img_id
return map_name_2_id
class LVISDatasetBOX(Dataset):
def __init__(
self, data_path, img_size, split='train', num_objects=3,
tiling_p=0.5, zero_shot=False, return_ids=False
):
self.split = split
self.data_path = data_path
self.horizontal_flip_p = 0.5
self.tiling_p = tiling_p
self.img_size = img_size
self.resize = T.Resize((img_size, img_size), antialias=True)
self.resize512 = T.Resize((512, 512), antialias=True)
self.jitter = T.RandomApply([T.ColorJitter(0.4, 0.4, 0.4, 0.1)], p=0.8)
self.num_objects = num_objects
self.zero_shot = zero_shot
self.return_ids = return_ids
self.img_path = os.path.join(data_path, "images")
# if split == 'val' or split == 'test':
self.labels = COCO(os.path.join(self.data_path, 'annotations', 'unseen_instances_' + split + '.json'))
self.image_ids = self.labels.getImgIds()
self.count_anno = self.load_json(os.path.join(data_path, "annotations", "unseen_count_" + split + ".json"))
self.img_name_to_ori_id = self.map_img_name_to_ori_id()
def load_json(self, json_file):
with open(json_file, "r") as f:
data = json.load(f)
return data
def __getitem__(self, idx: int):
img_id = self.image_ids[idx]
img_info = self.labels.loadImgs([img_id])[0]
img_file = img_info["file_name"]
img = Image.open(os.path.join(self.img_path, img_file)).convert("RGB")
ann_ids = self.labels.getAnnIds([img_id])
anns = self.labels.loadAnns(ids=ann_ids)
# and change to torch float32
gt_bboxes = [instance["bbox"] for instance in anns]
gt_bboxes = torch.tensor(gt_bboxes, dtype=torch.float32)
# change to x1y1x2y2
gt_bboxes = torch.tensor([xywh_to_x1y1x2y2(bbox) for bbox in gt_bboxes], dtype=torch.float32)
bboxes = self.count_anno["annotations"][idx]["boxes"]
bboxes = torch.tensor([xywh_to_x1y1x2y2(bbox) for bbox in bboxes], dtype=torch.float32)[:3]
img = T.Compose([
T.ToTensor(),
])(img)
density_map = torch.zeros((512,512)).unsqueeze(0)
# data augmentation
tiled = False
if self.split == 'train' and torch.rand(1) < self.tiling_p:
tiled = True
tile_size = (torch.rand(1) + 1, torch.rand(1) + 1)
img, bboxes, gt_bboxes = tiling_augmentation(
img, bboxes, self.resize,
self.jitter, tile_size, self.horizontal_flip_p, gt_bboxes=gt_bboxes
)
else:
img, bboxes, density_map, gt_bboxes, scaling_factor, (pad_width, pad_height) = resize_and_pad(img, bboxes, density_map, gt_bboxes=gt_bboxes)
if self.split == 'train':
if not tiled:
img = self.jitter(img)
img = T.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])(img)
if self.split == 'train' and not tiled and torch.rand(1) < self.horizontal_flip_p:
img = TVF.hflip(img)
density_map = TVF.hflip(density_map)
bboxes[:, [0, 2]] = self.img_size - bboxes[:, [2, 0]]
gt_bboxes[:, [0, 2]] = self.img_size - gt_bboxes[:, [2, 0]]
return img, bboxes, density_map, torch.tensor(img_id), gt_bboxes, scaling_factor, (pad_width, pad_height)
def __len__(self):
return len(self.image_ids)
def map_img_name_to_ori_id(self, ):
all_coco_imgs = self.labels.imgs
map_name_2_id = dict()
for k, v in all_coco_imgs.items():
img_id = v["id"]
img_name = v["file_name"]
map_name_2_id[img_name] = img_id
return map_name_2_id
#MULTISCALE IMAGES
def tile_multiscale(img, bboxes, density_map, gt_bboxes, size=1024.0, zero_shot=False, train=False):
# create image with one large repetition of the original image 512x512,
# the rest is padded with small repetitions of the original image 128x128
resize512 = T.Resize((512, 512), antialias=True)
channels, original_height, original_width = img.shape
longer_dimension = max(original_height, original_width)
scaling_factor = 512 / longer_dimension
scaled_bboxes = bboxes * scaling_factor
resized_img = torch.nn.functional.interpolate(img.unsqueeze(0), scale_factor=scaling_factor, mode='bilinear',
align_corners=False)
size = int(size)
pad_height = max(0, size - resized_img.shape[2])
pad_width = max(0, size - resized_img.shape[3])
padded_img = torch.nn.functional.pad(resized_img, (0, pad_width, 0, pad_height), mode='constant', value=0)[0]
resized_img2 = torch.nn.functional.interpolate(img.unsqueeze(0), scale_factor=scaling_factor / 2, mode='bilinear',
align_corners=False)[0]
w, h = resized_img2.shape[1], resized_img2.shape[2]
# make image of 1024x1024 with repetitions of the resized_img2
padded_img2 = torch.nn.functional.pad(resized_img2, (0, 1024-h, 0, 1024-w), mode='constant', value=0)
for i in range(0, 1024, w):
for j in range(0, 1024, h):
pad_w, pad_h = padded_img2[:, i:i + w, j:j + h].shape[1], padded_img2[:, i:i + w, j:j + h].shape[2]
padded_img2[:, i:i + pad_w, j:j + pad_h] = resized_img2[:,:pad_w, :pad_h]
#
# # overwrite padded_img with resized_img
padded_img2[padded_img != 0] = padded_img[padded_img != 0]
return padded_img, bboxes, density_map, gt_bboxes, 0, (0,0)
def resize_and_pad(img, bboxes, density_map=None, gt_bboxes=None, size=1024.0, zero_shot=False, train=False):
resize512 = T.Resize((512, 512), antialias=True)
channels, original_height, original_width = img.shape
longer_dimension = max(original_height, original_width)
scaling_factor = size / longer_dimension
scaled_bboxes = bboxes * scaling_factor
if not zero_shot and not train:
a_dim = ((scaled_bboxes[:, 2] - scaled_bboxes[:, 0]).mean() + (
scaled_bboxes[:, 3] - scaled_bboxes[:, 1]).mean()) / 2
scaling_factor = min(1.0, 80 / a_dim.item()) * scaling_factor
resized_img = torch.nn.functional.interpolate(img.unsqueeze(0), scale_factor=scaling_factor, mode='bilinear',
align_corners=False)
size = int(size)
pad_height = max(0, size - resized_img.shape[2])
pad_width = max(0, size - resized_img.shape[3])
padded_img = torch.nn.functional.pad(resized_img, (0, pad_width, 0, pad_height), mode='constant', value=0)[0]
if density_map is not None:
original_sum = density_map.sum()
_, w0, h0 = density_map.shape
_, W, H = img.shape
resized_density_map = torch.nn.functional.interpolate(density_map.unsqueeze(0), size=(W, H), mode='bilinear',
align_corners=False)
resized_density_map = torch.nn.functional.interpolate(resized_density_map, scale_factor=scaling_factor,
mode='bilinear',
align_corners=False)
padded_density_map = \
torch.nn.functional.pad(resized_density_map, (0, pad_width, 0, pad_height), mode='constant', value=0)[0]
padded_density_map = resize512(padded_density_map)
padded_density_map = padded_density_map / padded_density_map.sum() * original_sum
bboxes = bboxes * torch.tensor([scaling_factor, scaling_factor, scaling_factor, scaling_factor]).to(bboxes.device)
if gt_bboxes is None and density_map is None:
return padded_img, bboxes, scaling_factor
gt_bboxes = gt_bboxes * torch.tensor([scaling_factor, scaling_factor, scaling_factor, scaling_factor])
return padded_img, bboxes, padded_density_map, gt_bboxes, scaling_factor, (pad_width, pad_height)
import json
import logging
import os
import random
import numpy as np
import torchvision.transforms.functional as trans_F
import torchvision.transforms as T
from einops import rearrange
from PIL import Image, ImageFile
import torch
from torch.nn.utils.rnn import pad_sequence
from torch.utils.data import (DataLoader, Dataset, RandomSampler,
SequentialSampler)
from torchvision import transforms
from torch.nn.utils.rnn import pad_sequence
def pad_collate_mcac(batch):
(img, bboxes, image_names, gt_bboxes) = zip(*batch)
gt_bboxes_pad = pad_sequence(gt_bboxes, batch_first=True, padding_value=0)
img = torch.stack(img)
bboxes = torch.stack(bboxes)
image_names = torch.stack(image_names)
gt_bboxes = gt_bboxes_pad
return img, bboxes, image_names, gt_bboxes
IM_NORM_MEAN = [0.485, 0.456, 0.406]
IM_NORM_STD = [0.229, 0.224, 0.225]
Normalize_tensor = transforms.Compose(
[transforms.Normalize(mean=IM_NORM_MEAN, std=IM_NORM_STD)]
)
def denormalize(tensor, means=IM_NORM_MEAN, stds=IM_NORM_STD, clip_0_1=True):
with torch.no_grad():
denormalized = tensor.clone()
for channel, mean, std in zip(denormalized, means, stds):
channel.mul_(std).add_(mean)
if clip_0_1:
channel[channel < 0] = 0
channel[channel > 1] = 1
return denormalized
class MCAC_Dataset(Dataset):
def __init__(self, data_path,
image_size,
split='train',
num_objects=3,
tiling_p=0.5,
zero_shot=False,
training=True
):
ImageFile.LOAD_TRUNCATED_IMAGES = True
self.img_size = (image_size, image_size)
self.img_channels = 3
self.split = split
self.training = training
if split != 'train':
# load json with exemplars
with open(f"{data_path}/{self.split}_eval_bboxes.json", "r") as f:
self.exemplars = json.load(f)
self.im_dir = f"{data_path}/{self.split}"
CFG = dict()
CFG["MCAC_occ_limit"] = 70
CFG["MCAC_occ_limit_exemplar"] = 30
CFG["MCAC_crop_size"] = 672
self.gs_file = f"_c_8"
self.gs_file += "_occ_" + str(int(CFG["MCAC_occ_limit"])) if CFG["MCAC_occ_limit"] != -1 else ""
self.gs_file += "_non_int"
self.gs_file += f"_crop{CFG['MCAC_crop_size']}" if CFG["MCAC_crop_size"] != -1 else ""
self.gs_file += "_np"
self.im_ids = [
f for f in os.listdir(self.im_dir) if os.path.isdir(self.im_dir + "/" + f)
]
self.CFG = CFG
self.toten = transforms.ToTensor()
self.resize_im = transforms.Resize((self.img_size[0], self.img_size[0]))
self.bboxes_str = "bboxes_crop672"
self.centers_str = "centers"
self.occlusions_str = "occlusions_crop672"
self.area_str = "area"
self.json_p = f"info_with_occ_bbox.json"
# CFG["MCAC_exclude_imgs_with_num_classes_over"] = 1
# self.exlude_images_num_class()
print(
f"{self.split} set, size:{len(self.im_ids)}")
def __len__(self):
return len(self.im_ids)
def __getitem__(self, idx):
im_id = self.im_ids[idx]
image = Image.open(f"{self.im_dir}/{im_id}/img.png")
image.load()
if image.mode != "RGB":
image = image.convert("RGB")
image = self.toten(image)
if self.CFG["MCAC_crop_size"] != -1:
crop_boundary_size_0 = int(
(image.shape[1] - self.CFG["MCAC_crop_size"]) / 2
)
crop_boundary_size_1 = int(
(image.shape[2] - self.CFG["MCAC_crop_size"]) / 2
)
image = image[
:,
crop_boundary_size_0:-crop_boundary_size_0,
crop_boundary_size_1:-crop_boundary_size_1,
]
with open(f"{self.im_dir}/{im_id}/{self.json_p}", "r") as f:
img_info = json.load(f)
if self.split == 'train' and self.training:
# choose random int from 0 to img_info["countables"] length, and get the corresponding bbox
chosen_class = random.randint(0, len(img_info["countables"]) - 1)
# exemplar_bboxes should be 3 randomly selected from img_info["countables"][chosen_class]
occlusions = torch.tensor(img_info["countables"][chosen_class][self.occlusions_str])
all_bboxes = torch.tensor(img_info["countables"][chosen_class][self.bboxes_str], dtype=torch.float32)
all_bboxes[:, :, 0] = all_bboxes[:, :, 0] / (image.shape[1] / self.img_size[0])
all_bboxes[:, :, 1] = all_bboxes[:, :, 1] / (image.shape[2] / self.img_size[1])
all_bboxes = torch.clip(
all_bboxes, 0, self.img_size[0] - 1
)
all_bboxes = all_bboxes.reshape(-1, 4)
all_bboxes = torch.stack(
(all_bboxes[:, 2], all_bboxes[:, 0], all_bboxes[:, 3], all_bboxes[:, 1]),
axis=1,
)
gt_bboxes = all_bboxes[occlusions < self.CFG["MCAC_occ_limit"]]
exemplar_candidates = all_bboxes[occlusions < self.CFG["MCAC_occ_limit_exemplar"]]
if len(exemplar_candidates) < 3:
# sort exemplar_candidates by occlusions -- the less occlusions come first
exemplar_candidates = all_bboxes[occlusions.argsort()][:3]
exemplar_ids = torch.randperm(exemplar_candidates.shape[0])[:3]
exemplar_bboxes = exemplar_candidates[exemplar_ids]
image = self.resize_im(image)
image = T.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])(image)
return (
image,
exemplar_bboxes,
torch.tensor(idx),
gt_bboxes
)
bboxes = []
e_bboxes = []
for c_i, c in enumerate(img_info["countables"]):
occlusions = torch.tensor(img_info["countables"][c_i][self.occlusions_str])
all_bboxes = torch.tensor(img_info["countables"][c_i][self.bboxes_str], dtype=torch.float32)
all_bboxes[:, :, 0] = all_bboxes[:, :, 0] / (image.shape[1] / self.img_size[0])
all_bboxes[:, :, 1] = all_bboxes[:, :, 1] / (image.shape[2] / self.img_size[1])
all_bboxes = torch.clip(
all_bboxes, 0, self.img_size[0] - 1
)
all_bboxes = all_bboxes.reshape(-1, 4)
all_bboxes = torch.stack(
(all_bboxes[:, 2], all_bboxes[:, 0], all_bboxes[:, 3], all_bboxes[:, 1]),
axis=1,
)
gt_bboxes = all_bboxes[occlusions < self.CFG["MCAC_occ_limit"]]
if self.split == 'train':
exemplar_bboxes = all_bboxes[occlusions < self.CFG["MCAC_occ_limit_exemplar"]]
if len(exemplar_bboxes) < 3:
# sort exemplar_candidates by occlusions -- the less occlusions come first
exemplar_bboxes = all_bboxes[occlusions.argsort()][:3]
else:
assert self.exemplars[im_id][c_i]['obj_id'] == c['obj_id']
orig_exemplar_idx = torch.tensor(self.exemplars[im_id][c_i]['eval_bbox_inds'])
# all_bbox_idx = torch.tensor(c['inds'])
# mask = torch.isin(all_bbox_idx, orig_exemplar_idx)
# indices = torch.nonzero(mask, as_tuple=True)[0]
exemplar_bboxes = all_bboxes[orig_exemplar_idx]
bboxes.append(gt_bboxes)
e_bboxes.append(exemplar_bboxes)
image = self.resize_im(image)
bboxes = pad_sequence(bboxes, batch_first=True, padding_value=0)
e_bboxes = pad_sequence(e_bboxes, batch_first=True, padding_value=0)
image = T.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])(image)
return (
image,
e_bboxes,
torch.tensor(idx),
bboxes
)
def exlude_images_num_class(self):
new_im_ids = []
for id in self.im_ids:
with open(f"{self.im_dir}/{id}/{self.json_p}", "r") as f:
img_info = json.load(f)
num_countables = 0
for c in img_info["countables"]:
if self.CFG["MCAC_occ_limit"] != -1:
assert len(c[self.occlusions_str]) == len(c["inds"])
cnt_np = np.array(c[self.occlusions_str])
inds = cnt_np < self.CFG["MCAC_occ_limit"]
cnt_np = cnt_np[inds]
cnt = len(cnt_np)
else:
cnt = len(c["inds"])
if cnt >= 1:
num_countables += 1
if (
num_countables
<= self.CFG["MCAC_exclude_imgs_with_num_classes_over"]
):
new_im_ids.append(id)
print(
f"EXCLUDING OVER LIMIT: {self.CFG['MCAC_exclude_imgs_with_num_classes_over']} class, from:{len(self.im_ids)} to {len(new_im_ids)}"
)
self.im_ids = new_im_ids
def exlude_images_counts(self):
new_im_ids = []
all_counts = []
for id in self.im_ids:
with open(f"{self.im_dir}/{id}/{self.json_p}", "r") as f:
img_info = json.load(f)
include = True
for c in img_info["countables"]:
if self.CFG["MCAC_occ_limit"] != -1:
assert len(c[self.occlusions_str]) == len(c["inds"])
cnt_np = np.array(c[self.occlusions_str])
inds = cnt_np < self.CFG["MCAC_occ_limit"]
cnt_np = cnt_np[inds]
cnt = len(cnt_np)
else:
cnt = len(c["inds"])
if cnt != 0:
all_counts.append(cnt)
if cnt > self.CFG["MCAC_exclude_imgs_with_counts_over"]:
include = False
if include:
new_im_ids.append(id)
print(
f"EXCLUDING OVER LIMIT: {self.CFG['MCAC_exclude_imgs_with_counts_over']} count, from:{len(self.im_ids)} to {len(new_im_ids)}"
)
self.im_ids = new_im_ids
def ref_rot(self, image, dots, rects, density):
if random.random() > 0.5:
image = trans_F.hflip(image)
density = trans_F.hflip(density)
dots = self.hflip_dots(dots)
rects = self.hflip_bboxes(rects)
if random.random() > 0.5:
image = trans_F.vflip(image)
density = trans_F.vflip(density)
dots = self.vflip_dots(dots)
rects = self.vflip_bboxes(rects)
rotate_angle = int(random.random() * 4)
if rotate_angle != 0:
image = trans_F.rotate(image, rotate_angle * 90)
density = trans_F.rotate(density, rotate_angle * 90)
for _i in range(rotate_angle):
dots = self.rotate_dots_90(dots)
rects = self.rotate_bboxes_90(rects)
return image, dots, rects, density
def rotate_bboxes_90(self, rects):
none_rects = rects == -1
new_x_rects = rects[:, :, 0]
new_y_rects = (self.img_size[1] - 1) - rects[:, :, 1]
rects = np.stack((new_y_rects, new_x_rects), axis=-2)
rects[none_rects] = -1
return rects
def rotate_dots_90(self, dots):
none_dots = dots == -1
new_x = dots[:, :, 1]
new_y = (self.img_size[1] - 1) - dots[:, :, 0]
dots = np.stack((new_x, new_y), axis=-1)
dots[none_dots] = -1
return dots
def vflip_bboxes(self, rects):
none_rects = rects == -1
rects[:, :, 0] = (self.img_size[1] - 1) - rects[:, :, 0]
rects[none_rects] = -1
return rects
def vflip_dots(self, dots):
none_dots = dots == -1
dots[:, :, 1] = (self.img_size[1] - 1) - dots[:, :, 1]
dots[none_dots] = -1
return dots
def hflip_bboxes(self, rects):
none_rects = rects == -1
rects[:, :, 1] = (self.img_size[0] - 1) - rects[:, :, 1]
rects[none_rects] = -1
return rects
def hflip_dots(self, dots):
none_dots = dots == -1
dots[:, :, 0] = (self.img_size[0] - 1) - dots[:, :, 0]
dots[none_dots] = -1
return dots
def get_loader_counting(CFG):
test_loader = get_dataloader(CFG, train=False)
train_loader = get_dataloader(CFG, train=True)
return train_loader, test_loader
def get_dataloader(CFG, train):
if CFG["dataset"] == "MCAC" or CFG["dataset"] == "MCAC-M1":
dataset = MCAC_Dataset(CFG, train=train)
if train:
bs = CFG["train_batch_size"]
sampler = RandomSampler(dataset)
else:
bs = CFG["eval_batch_size"]
sampler = SequentialSampler(dataset)
loader = DataLoader(
dataset,
sampler=sampler,
batch_size=bs,
num_workers=CFG["num_workers"],
pin_memory=True,
drop_last=CFG["drop_last"],
)
return loader
def generate_density_maps(data_path, target_size=(512, 512)):
density_map_path = os.path.join(
data_path,
f'gt_density_map_adaptive_{target_size[0]}_{target_size[1]}_object_VarV2'
)
if not os.path.isdir(density_map_path):
os.makedirs(density_map_path)
with open(
os.path.join(data_path, 'annotation_FSC147_384.json'), 'rb'
) as file:
annotations = json.load(file)
device = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu')
for i, (image_name, ann) in enumerate(tqdm(annotations.items())):
_, h, w = T.ToTensor()(Image.open(os.path.join(
data_path,
'images_384_VarV2',
image_name
))).size()
h_ratio, w_ratio = target_size[0] / h, target_size[1] / w
points = (
torch.tensor(ann['points'], device=device) *
torch.tensor([w_ratio, h_ratio], device=device)
).long()
points[:, 0] = points[:, 0].clip(0, target_size[1] - 1)
points[:, 1] = points[:, 1].clip(0, target_size[0] - 1)
bboxes = box_convert(torch.tensor(
ann['box_examples_coordinates'],
dtype=torch.float32,
device=device
)[:3, [0, 2], :].reshape(-1, 4), in_fmt='xyxy', out_fmt='xywh')
bboxes = bboxes * torch.tensor([w_ratio, h_ratio, w_ratio, h_ratio], device=device)
window_size = bboxes.mean(dim=0)[2:].cpu().numpy()[::-1]
dmap = torch.zeros(*target_size)
for p in range(points.size(0)):
dmap[points[p, 1], points[p, 0]] += 1
dmap = gaussian_filter(dmap.cpu().numpy(), window_size / 8)
np.save(os.path.join(density_map_path, os.path.splitext(image_name)[0] + '.npy'), dmap)
if __name__ == '__main__':
parser = argparse.ArgumentParser("Density map generator", add_help=False)
parser.add_argument(
'--data_path',
default='dpath',
type=str
)
parser.add_argument('--image_size', default=512, type=int)
args = parser.parse_args()
generate_density_maps(args.data_path, (args.image_size, args.image_size))