import argparse import json import os import numpy as np import torch from PIL import Image from pycocotools.coco import COCO from scipy.ndimage import gaussian_filter from torch.utils.data import Dataset from torchvision import transforms as T from torchvision.ops import box_convert from torchvision.transforms import functional as TVF from tqdm import tqdm from torch.nn.utils.rnn import pad_sequence def tiling_augmentation(img, bboxes, resize, jitter, tile_size, hflip_p, gt_bboxes=None, density_map=None): def apply_hflip(tensor, apply): return TVF.hflip(tensor) if apply else tensor def make_tile(x, num_tiles, jitter=None): result = list() for j in range(num_tiles): row = list() for k in range(num_tiles): t = jitter(x) if jitter is not None else x row.append(t) result.append(torch.cat(row, dim=-1)) return torch.cat(result, dim=-2) x_tile, y_tile = tile_size y_target, x_target = resize.size num_tiles = max(int(x_tile.ceil()), int(y_tile.ceil())) img = make_tile(img, num_tiles, jitter=jitter) c, h, w = img.shape img = resize(img) if density_map is not None: density_map = make_tile(density_map, num_tiles, jitter=jitter) density_map = density_map original_sum = density_map.sum() density_map = resize(density_map) density_map = density_map / density_map.sum() * original_sum bboxes = bboxes / torch.tensor([w, h, w, h]) * resize.size[0] if gt_bboxes is not None: gt_bboxes_ = gt_bboxes / torch.tensor([w, h, w, h]) * resize.size[0] gt_bboxes_tiled = torch.cat([gt_bboxes_, gt_bboxes_ + torch.tensor([0, y_target // 2, 0, y_target // 2]), gt_bboxes_ + torch.tensor([x_target // 2, 0, x_target // 2, 0]), gt_bboxes_ + torch.tensor( [x_target // 2, y_target // 2, x_target // 2, y_target // 2])]) return img, bboxes, density_map, gt_bboxes_tiled return img, bboxes, density_map def xywh_to_x1y1x2y2(xywh): x, y, w, h = xywh x1 = x y1 = y x2 = x + w y2 = y + h return [x1, y1, x2, y2] def pad_collate(batch): (img, bboxes, density_map, image_names, gt_bboxes) = zip(*batch) gt_bboxes_pad = pad_sequence(gt_bboxes, batch_first=True, padding_value=0) img = torch.stack(img) bboxes = torch.stack(bboxes) image_names = torch.stack(image_names) gt_bboxes = gt_bboxes_pad density_map = torch.stack(density_map) return img, bboxes, density_map, image_names, gt_bboxes def pad_collate_test(batch): (img, bboxes, density_map, ids, gt_bboxes, scaling_factor, padwh) = zip(*batch) gt_bboxes_pad = pad_sequence(gt_bboxes, batch_first=True, padding_value=0) img = torch.stack(img) bboxes = torch.stack(bboxes) density_map = torch.stack(density_map) ids = torch.stack(ids) scaling_factor = torch.tensor(scaling_factor) padwh = torch.tensor(padwh) return img, bboxes, density_map, ids, gt_bboxes_pad, scaling_factor, padwh class FSC147DATASET(Dataset): def __init__( self, data_path, img_size, split='train', num_objects=3, tiling_p=0.5, zero_shot=False, return_ids=False, training=False ): self.split = split self.data_path = data_path self.horizontal_flip_p = 0.5 self.tiling_p = tiling_p self.img_size = img_size self.resize = T.Resize((img_size, img_size), antialias=True) self.resize512 = T.Resize((512, 512), antialias=True) self.jitter = T.RandomApply([T.ColorJitter(0.4, 0.4, 0.4, 0.1)], p=0.8) self.num_objects = num_objects self.zero_shot = zero_shot self.return_ids = return_ids self.training = training with open( os.path.join(self.data_path, 'annotations', 'Train_Test_Val_FSC_147.json'), 'rb' ) as file: splits = json.load(file) self.image_names = splits[split] with open( os.path.join(self.data_path, 'annotations', 'annotation_FSC147_384.json'), 'rb' ) as file: self.annotations = json.load(file) self.labels = COCO(os.path.join(self.data_path, 'annotations', 'instances_' + split + '.json')) self.img_name_to_ori_id = self.map_img_name_to_ori_id() def get_gt_bboxes(self, idx): coco_im_id = self.img_name_to_ori_id[self.image_names[idx]] anno_ids = self.labels.getAnnIds([coco_im_id]) annotations = self.labels.loadAnns(anno_ids) bboxes = [] for a in annotations: bboxes.append(xywh_to_x1y1x2y2(a['bbox'])) return bboxes def __getitem__(self, idx: int): img = Image.open(os.path.join( self.data_path, 'images_384_VarV2', self.image_names[idx] )).convert("RGB") w, h = img.size gt_bboxes = torch.tensor(self.get_gt_bboxes(idx)) # fig, ax = plt.subplots(1) # # Display the image # ax.imshow(img) # # Plot each bounding box # for bbox in gt_bboxes: # x, y, width, height = bbox # rect = patches.Rectangle( # (x, y), width - x, height - y, # linewidth=0.8, edgecolor='r', facecolor='none' # ) # ax.add_patch(rect) # # plt.savefig(os.path.join("/storage/datasets/fsc147/plot/",self.image_names[idx])) # plt.close() img = T.Compose([ T.ToTensor(), ])(img) bboxes = torch.tensor( self.annotations[self.image_names[idx]]['box_examples_coordinates'], dtype=torch.float32 )[:3, [0, 2], :].reshape(-1, 4)[:self.num_objects, ...] # take the bbox with largest area bboxes are in xyxy format # width = bboxes[:, 2] - bboxes[:, 0] # height = bboxes[:, 3] - bboxes[:, 1] # area = width * height # bboxes = bboxes[area.argsort()] # bboxes = bboxes[0].unsqueeze(0) density_map = torch.from_numpy(np.load(os.path.join( self.data_path, 'gt_density_map_adaptive_512_512_object_VarV2', # 'gt_density_map_adaptive_1024_1024_SAME', os.path.splitext(self.image_names[idx])[0] + '.npy', ))).unsqueeze(0) if self.split == 'train': tiled = False # data augmentation # if mean of bbox width and height is under a predefined threshold channels, original_height, original_width = img.shape longer_dimension = max(original_height, original_width) scaling_factor = self.img_size / longer_dimension bboxes_resized = bboxes * torch.tensor([scaling_factor, scaling_factor, scaling_factor, scaling_factor]) if (bboxes_resized[:, 2] - bboxes_resized[:, 0]).mean() > 30 and ( bboxes_resized[:, 3] - bboxes_resized[:, 1]).mean() > 30 and torch.rand(1) < self.tiling_p: tiled = True tile_size = (torch.rand(1) + 1, torch.rand(1) + 1) img, bboxes, density_map, gt_bboxes = tiling_augmentation( img, bboxes, self.resize, self.jitter, tile_size, self.horizontal_flip_p, gt_bboxes=gt_bboxes, density_map=density_map ) else: img = self.jitter(img) img, bboxes, density_map, gt_bboxes, scaling_factor, padwh = resize_and_pad(img, bboxes, density_map, gt_bboxes=gt_bboxes, train=True) if not tiled and torch.rand(1) < self.horizontal_flip_p: img = TVF.hflip(img) density_map = TVF.hflip(density_map) bboxes[:, [0, 2]] = self.img_size - bboxes[:, [2, 0]] gt_bboxes[:, [0, 2]] = self.img_size - gt_bboxes[:, [2, 0]] else: # if bboxes (xyxy) are in average > 50 px call this # width = bboxes[:, 2] - bboxes[:, 0] # height = bboxes[:, 3] - bboxes[:, 1] # if width.mean()>50 and height.mean()>50: img, bboxes, density_map, gt_bboxes, scaling_factor, padwh = tile_multiscale(img, bboxes, density_map, gt_bboxes=gt_bboxes) # else: # return 1, 1, 1, 1, 1, 1, 1 original_sum = density_map.sum() density_map = self.resize512(density_map) density_map = density_map / density_map.sum() * original_sum gt_bboxes = torch.clamp(gt_bboxes, min=0, max=1024) img = T.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])(img) # if self.split == 'train' or self.training: # return img, bboxes, density_map, torch.tensor(idx), gt_bboxes # else: return img, bboxes, density_map, torch.tensor(idx), gt_bboxes, torch.tensor(scaling_factor), padwh def __len__(self): return len(self.image_names) def map_img_name_to_ori_id(self, ): all_coco_imgs = self.labels.imgs map_name_2_id = dict() for k, v in all_coco_imgs.items(): img_id = v["id"] img_name = v["file_name"] map_name_2_id[img_name] = img_id return map_name_2_id class LVISDatasetBOX(Dataset): def __init__( self, data_path, img_size, split='train', num_objects=3, tiling_p=0.5, zero_shot=False, return_ids=False ): self.split = split self.data_path = data_path self.horizontal_flip_p = 0.5 self.tiling_p = tiling_p self.img_size = img_size self.resize = T.Resize((img_size, img_size), antialias=True) self.resize512 = T.Resize((512, 512), antialias=True) self.jitter = T.RandomApply([T.ColorJitter(0.4, 0.4, 0.4, 0.1)], p=0.8) self.num_objects = num_objects self.zero_shot = zero_shot self.return_ids = return_ids self.img_path = os.path.join(data_path, "images") # if split == 'val' or split == 'test': self.labels = COCO(os.path.join(self.data_path, 'annotations', 'unseen_instances_' + split + '.json')) self.image_ids = self.labels.getImgIds() self.count_anno = self.load_json(os.path.join(data_path, "annotations", "unseen_count_" + split + ".json")) self.img_name_to_ori_id = self.map_img_name_to_ori_id() def load_json(self, json_file): with open(json_file, "r") as f: data = json.load(f) return data def __getitem__(self, idx: int): img_id = self.image_ids[idx] img_info = self.labels.loadImgs([img_id])[0] img_file = img_info["file_name"] img = Image.open(os.path.join(self.img_path, img_file)).convert("RGB") ann_ids = self.labels.getAnnIds([img_id]) anns = self.labels.loadAnns(ids=ann_ids) # and change to torch float32 gt_bboxes = [instance["bbox"] for instance in anns] gt_bboxes = torch.tensor(gt_bboxes, dtype=torch.float32) # change to x1y1x2y2 gt_bboxes = torch.tensor([xywh_to_x1y1x2y2(bbox) for bbox in gt_bboxes], dtype=torch.float32) bboxes = self.count_anno["annotations"][idx]["boxes"] bboxes = torch.tensor([xywh_to_x1y1x2y2(bbox) for bbox in bboxes], dtype=torch.float32)[:3] img = T.Compose([ T.ToTensor(), ])(img) density_map = torch.zeros((512,512)).unsqueeze(0) # data augmentation tiled = False if self.split == 'train' and torch.rand(1) < self.tiling_p: tiled = True tile_size = (torch.rand(1) + 1, torch.rand(1) + 1) img, bboxes, gt_bboxes = tiling_augmentation( img, bboxes, self.resize, self.jitter, tile_size, self.horizontal_flip_p, gt_bboxes=gt_bboxes ) else: img, bboxes, density_map, gt_bboxes, scaling_factor, (pad_width, pad_height) = resize_and_pad(img, bboxes, density_map, gt_bboxes=gt_bboxes) if self.split == 'train': if not tiled: img = self.jitter(img) img = T.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])(img) if self.split == 'train' and not tiled and torch.rand(1) < self.horizontal_flip_p: img = TVF.hflip(img) density_map = TVF.hflip(density_map) bboxes[:, [0, 2]] = self.img_size - bboxes[:, [2, 0]] gt_bboxes[:, [0, 2]] = self.img_size - gt_bboxes[:, [2, 0]] return img, bboxes, density_map, torch.tensor(img_id), gt_bboxes, scaling_factor, (pad_width, pad_height) def __len__(self): return len(self.image_ids) def map_img_name_to_ori_id(self, ): all_coco_imgs = self.labels.imgs map_name_2_id = dict() for k, v in all_coco_imgs.items(): img_id = v["id"] img_name = v["file_name"] map_name_2_id[img_name] = img_id return map_name_2_id #MULTISCALE IMAGES def tile_multiscale(img, bboxes, density_map, gt_bboxes, size=1024.0, zero_shot=False, train=False): # create image with one large repetition of the original image 512x512, # the rest is padded with small repetitions of the original image 128x128 resize512 = T.Resize((512, 512), antialias=True) channels, original_height, original_width = img.shape longer_dimension = max(original_height, original_width) scaling_factor = 512 / longer_dimension scaled_bboxes = bboxes * scaling_factor resized_img = torch.nn.functional.interpolate(img.unsqueeze(0), scale_factor=scaling_factor, mode='bilinear', align_corners=False) size = int(size) pad_height = max(0, size - resized_img.shape[2]) pad_width = max(0, size - resized_img.shape[3]) padded_img = torch.nn.functional.pad(resized_img, (0, pad_width, 0, pad_height), mode='constant', value=0)[0] resized_img2 = torch.nn.functional.interpolate(img.unsqueeze(0), scale_factor=scaling_factor / 2, mode='bilinear', align_corners=False)[0] w, h = resized_img2.shape[1], resized_img2.shape[2] # make image of 1024x1024 with repetitions of the resized_img2 padded_img2 = torch.nn.functional.pad(resized_img2, (0, 1024-h, 0, 1024-w), mode='constant', value=0) for i in range(0, 1024, w): for j in range(0, 1024, h): pad_w, pad_h = padded_img2[:, i:i + w, j:j + h].shape[1], padded_img2[:, i:i + w, j:j + h].shape[2] padded_img2[:, i:i + pad_w, j:j + pad_h] = resized_img2[:,:pad_w, :pad_h] # # # overwrite padded_img with resized_img padded_img2[padded_img != 0] = padded_img[padded_img != 0] return padded_img, bboxes, density_map, gt_bboxes, 0, (0,0) def resize_and_pad(img, bboxes, density_map=None, gt_bboxes=None, size=1024.0, zero_shot=False, train=False): resize512 = T.Resize((512, 512), antialias=True) channels, original_height, original_width = img.shape longer_dimension = max(original_height, original_width) scaling_factor = size / longer_dimension scaled_bboxes = bboxes * scaling_factor if not zero_shot and not train: a_dim = ((scaled_bboxes[:, 2] - scaled_bboxes[:, 0]).mean() + ( scaled_bboxes[:, 3] - scaled_bboxes[:, 1]).mean()) / 2 scaling_factor = min(1.0, 80 / a_dim.item()) * scaling_factor resized_img = torch.nn.functional.interpolate(img.unsqueeze(0), scale_factor=scaling_factor, mode='bilinear', align_corners=False) size = int(size) pad_height = max(0, size - resized_img.shape[2]) pad_width = max(0, size - resized_img.shape[3]) padded_img = torch.nn.functional.pad(resized_img, (0, pad_width, 0, pad_height), mode='constant', value=0)[0] if density_map is not None: original_sum = density_map.sum() _, w0, h0 = density_map.shape _, W, H = img.shape resized_density_map = torch.nn.functional.interpolate(density_map.unsqueeze(0), size=(W, H), mode='bilinear', align_corners=False) resized_density_map = torch.nn.functional.interpolate(resized_density_map, scale_factor=scaling_factor, mode='bilinear', align_corners=False) padded_density_map = \ torch.nn.functional.pad(resized_density_map, (0, pad_width, 0, pad_height), mode='constant', value=0)[0] padded_density_map = resize512(padded_density_map) padded_density_map = padded_density_map / padded_density_map.sum() * original_sum bboxes = bboxes * torch.tensor([scaling_factor, scaling_factor, scaling_factor, scaling_factor]).to(bboxes.device) if gt_bboxes is None and density_map is None: return padded_img, bboxes, scaling_factor gt_bboxes = gt_bboxes * torch.tensor([scaling_factor, scaling_factor, scaling_factor, scaling_factor]) return padded_img, bboxes, padded_density_map, gt_bboxes, scaling_factor, (pad_width, pad_height) import json import logging import os import random import numpy as np import torchvision.transforms.functional as trans_F import torchvision.transforms as T from einops import rearrange from PIL import Image, ImageFile import torch from torch.nn.utils.rnn import pad_sequence from torch.utils.data import (DataLoader, Dataset, RandomSampler, SequentialSampler) from torchvision import transforms from torch.nn.utils.rnn import pad_sequence def pad_collate_mcac(batch): (img, bboxes, image_names, gt_bboxes) = zip(*batch) gt_bboxes_pad = pad_sequence(gt_bboxes, batch_first=True, padding_value=0) img = torch.stack(img) bboxes = torch.stack(bboxes) image_names = torch.stack(image_names) gt_bboxes = gt_bboxes_pad return img, bboxes, image_names, gt_bboxes IM_NORM_MEAN = [0.485, 0.456, 0.406] IM_NORM_STD = [0.229, 0.224, 0.225] Normalize_tensor = transforms.Compose( [transforms.Normalize(mean=IM_NORM_MEAN, std=IM_NORM_STD)] ) def denormalize(tensor, means=IM_NORM_MEAN, stds=IM_NORM_STD, clip_0_1=True): with torch.no_grad(): denormalized = tensor.clone() for channel, mean, std in zip(denormalized, means, stds): channel.mul_(std).add_(mean) if clip_0_1: channel[channel < 0] = 0 channel[channel > 1] = 1 return denormalized class MCAC_Dataset(Dataset): def __init__(self, data_path, image_size, split='train', num_objects=3, tiling_p=0.5, zero_shot=False, training=True ): ImageFile.LOAD_TRUNCATED_IMAGES = True self.img_size = (image_size, image_size) self.img_channels = 3 self.split = split self.training = training if split != 'train': # load json with exemplars with open(f"{data_path}/{self.split}_eval_bboxes.json", "r") as f: self.exemplars = json.load(f) self.im_dir = f"{data_path}/{self.split}" CFG = dict() CFG["MCAC_occ_limit"] = 70 CFG["MCAC_occ_limit_exemplar"] = 30 CFG["MCAC_crop_size"] = 672 self.gs_file = f"_c_8" self.gs_file += "_occ_" + str(int(CFG["MCAC_occ_limit"])) if CFG["MCAC_occ_limit"] != -1 else "" self.gs_file += "_non_int" self.gs_file += f"_crop{CFG['MCAC_crop_size']}" if CFG["MCAC_crop_size"] != -1 else "" self.gs_file += "_np" self.im_ids = [ f for f in os.listdir(self.im_dir) if os.path.isdir(self.im_dir + "/" + f) ] self.CFG = CFG self.toten = transforms.ToTensor() self.resize_im = transforms.Resize((self.img_size[0], self.img_size[0])) self.bboxes_str = "bboxes_crop672" self.centers_str = "centers" self.occlusions_str = "occlusions_crop672" self.area_str = "area" self.json_p = f"info_with_occ_bbox.json" # CFG["MCAC_exclude_imgs_with_num_classes_over"] = 1 # self.exlude_images_num_class() print( f"{self.split} set, size:{len(self.im_ids)}") def __len__(self): return len(self.im_ids) def __getitem__(self, idx): im_id = self.im_ids[idx] image = Image.open(f"{self.im_dir}/{im_id}/img.png") image.load() if image.mode != "RGB": image = image.convert("RGB") image = self.toten(image) if self.CFG["MCAC_crop_size"] != -1: crop_boundary_size_0 = int( (image.shape[1] - self.CFG["MCAC_crop_size"]) / 2 ) crop_boundary_size_1 = int( (image.shape[2] - self.CFG["MCAC_crop_size"]) / 2 ) image = image[ :, crop_boundary_size_0:-crop_boundary_size_0, crop_boundary_size_1:-crop_boundary_size_1, ] with open(f"{self.im_dir}/{im_id}/{self.json_p}", "r") as f: img_info = json.load(f) if self.split == 'train' and self.training: # choose random int from 0 to img_info["countables"] length, and get the corresponding bbox chosen_class = random.randint(0, len(img_info["countables"]) - 1) # exemplar_bboxes should be 3 randomly selected from img_info["countables"][chosen_class] occlusions = torch.tensor(img_info["countables"][chosen_class][self.occlusions_str]) all_bboxes = torch.tensor(img_info["countables"][chosen_class][self.bboxes_str], dtype=torch.float32) all_bboxes[:, :, 0] = all_bboxes[:, :, 0] / (image.shape[1] / self.img_size[0]) all_bboxes[:, :, 1] = all_bboxes[:, :, 1] / (image.shape[2] / self.img_size[1]) all_bboxes = torch.clip( all_bboxes, 0, self.img_size[0] - 1 ) all_bboxes = all_bboxes.reshape(-1, 4) all_bboxes = torch.stack( (all_bboxes[:, 2], all_bboxes[:, 0], all_bboxes[:, 3], all_bboxes[:, 1]), axis=1, ) gt_bboxes = all_bboxes[occlusions < self.CFG["MCAC_occ_limit"]] exemplar_candidates = all_bboxes[occlusions < self.CFG["MCAC_occ_limit_exemplar"]] if len(exemplar_candidates) < 3: # sort exemplar_candidates by occlusions -- the less occlusions come first exemplar_candidates = all_bboxes[occlusions.argsort()][:3] exemplar_ids = torch.randperm(exemplar_candidates.shape[0])[:3] exemplar_bboxes = exemplar_candidates[exemplar_ids] image = self.resize_im(image) image = T.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])(image) return ( image, exemplar_bboxes, torch.tensor(idx), gt_bboxes ) bboxes = [] e_bboxes = [] for c_i, c in enumerate(img_info["countables"]): occlusions = torch.tensor(img_info["countables"][c_i][self.occlusions_str]) all_bboxes = torch.tensor(img_info["countables"][c_i][self.bboxes_str], dtype=torch.float32) all_bboxes[:, :, 0] = all_bboxes[:, :, 0] / (image.shape[1] / self.img_size[0]) all_bboxes[:, :, 1] = all_bboxes[:, :, 1] / (image.shape[2] / self.img_size[1]) all_bboxes = torch.clip( all_bboxes, 0, self.img_size[0] - 1 ) all_bboxes = all_bboxes.reshape(-1, 4) all_bboxes = torch.stack( (all_bboxes[:, 2], all_bboxes[:, 0], all_bboxes[:, 3], all_bboxes[:, 1]), axis=1, ) gt_bboxes = all_bboxes[occlusions < self.CFG["MCAC_occ_limit"]] if self.split == 'train': exemplar_bboxes = all_bboxes[occlusions < self.CFG["MCAC_occ_limit_exemplar"]] if len(exemplar_bboxes) < 3: # sort exemplar_candidates by occlusions -- the less occlusions come first exemplar_bboxes = all_bboxes[occlusions.argsort()][:3] else: assert self.exemplars[im_id][c_i]['obj_id'] == c['obj_id'] orig_exemplar_idx = torch.tensor(self.exemplars[im_id][c_i]['eval_bbox_inds']) # all_bbox_idx = torch.tensor(c['inds']) # mask = torch.isin(all_bbox_idx, orig_exemplar_idx) # indices = torch.nonzero(mask, as_tuple=True)[0] exemplar_bboxes = all_bboxes[orig_exemplar_idx] bboxes.append(gt_bboxes) e_bboxes.append(exemplar_bboxes) image = self.resize_im(image) bboxes = pad_sequence(bboxes, batch_first=True, padding_value=0) e_bboxes = pad_sequence(e_bboxes, batch_first=True, padding_value=0) image = T.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])(image) return ( image, e_bboxes, torch.tensor(idx), bboxes ) def exlude_images_num_class(self): new_im_ids = [] for id in self.im_ids: with open(f"{self.im_dir}/{id}/{self.json_p}", "r") as f: img_info = json.load(f) num_countables = 0 for c in img_info["countables"]: if self.CFG["MCAC_occ_limit"] != -1: assert len(c[self.occlusions_str]) == len(c["inds"]) cnt_np = np.array(c[self.occlusions_str]) inds = cnt_np < self.CFG["MCAC_occ_limit"] cnt_np = cnt_np[inds] cnt = len(cnt_np) else: cnt = len(c["inds"]) if cnt >= 1: num_countables += 1 if ( num_countables <= self.CFG["MCAC_exclude_imgs_with_num_classes_over"] ): new_im_ids.append(id) print( f"EXCLUDING OVER LIMIT: {self.CFG['MCAC_exclude_imgs_with_num_classes_over']} class, from:{len(self.im_ids)} to {len(new_im_ids)}" ) self.im_ids = new_im_ids def exlude_images_counts(self): new_im_ids = [] all_counts = [] for id in self.im_ids: with open(f"{self.im_dir}/{id}/{self.json_p}", "r") as f: img_info = json.load(f) include = True for c in img_info["countables"]: if self.CFG["MCAC_occ_limit"] != -1: assert len(c[self.occlusions_str]) == len(c["inds"]) cnt_np = np.array(c[self.occlusions_str]) inds = cnt_np < self.CFG["MCAC_occ_limit"] cnt_np = cnt_np[inds] cnt = len(cnt_np) else: cnt = len(c["inds"]) if cnt != 0: all_counts.append(cnt) if cnt > self.CFG["MCAC_exclude_imgs_with_counts_over"]: include = False if include: new_im_ids.append(id) print( f"EXCLUDING OVER LIMIT: {self.CFG['MCAC_exclude_imgs_with_counts_over']} count, from:{len(self.im_ids)} to {len(new_im_ids)}" ) self.im_ids = new_im_ids def ref_rot(self, image, dots, rects, density): if random.random() > 0.5: image = trans_F.hflip(image) density = trans_F.hflip(density) dots = self.hflip_dots(dots) rects = self.hflip_bboxes(rects) if random.random() > 0.5: image = trans_F.vflip(image) density = trans_F.vflip(density) dots = self.vflip_dots(dots) rects = self.vflip_bboxes(rects) rotate_angle = int(random.random() * 4) if rotate_angle != 0: image = trans_F.rotate(image, rotate_angle * 90) density = trans_F.rotate(density, rotate_angle * 90) for _i in range(rotate_angle): dots = self.rotate_dots_90(dots) rects = self.rotate_bboxes_90(rects) return image, dots, rects, density def rotate_bboxes_90(self, rects): none_rects = rects == -1 new_x_rects = rects[:, :, 0] new_y_rects = (self.img_size[1] - 1) - rects[:, :, 1] rects = np.stack((new_y_rects, new_x_rects), axis=-2) rects[none_rects] = -1 return rects def rotate_dots_90(self, dots): none_dots = dots == -1 new_x = dots[:, :, 1] new_y = (self.img_size[1] - 1) - dots[:, :, 0] dots = np.stack((new_x, new_y), axis=-1) dots[none_dots] = -1 return dots def vflip_bboxes(self, rects): none_rects = rects == -1 rects[:, :, 0] = (self.img_size[1] - 1) - rects[:, :, 0] rects[none_rects] = -1 return rects def vflip_dots(self, dots): none_dots = dots == -1 dots[:, :, 1] = (self.img_size[1] - 1) - dots[:, :, 1] dots[none_dots] = -1 return dots def hflip_bboxes(self, rects): none_rects = rects == -1 rects[:, :, 1] = (self.img_size[0] - 1) - rects[:, :, 1] rects[none_rects] = -1 return rects def hflip_dots(self, dots): none_dots = dots == -1 dots[:, :, 0] = (self.img_size[0] - 1) - dots[:, :, 0] dots[none_dots] = -1 return dots def get_loader_counting(CFG): test_loader = get_dataloader(CFG, train=False) train_loader = get_dataloader(CFG, train=True) return train_loader, test_loader def get_dataloader(CFG, train): if CFG["dataset"] == "MCAC" or CFG["dataset"] == "MCAC-M1": dataset = MCAC_Dataset(CFG, train=train) if train: bs = CFG["train_batch_size"] sampler = RandomSampler(dataset) else: bs = CFG["eval_batch_size"] sampler = SequentialSampler(dataset) loader = DataLoader( dataset, sampler=sampler, batch_size=bs, num_workers=CFG["num_workers"], pin_memory=True, drop_last=CFG["drop_last"], ) return loader def generate_density_maps(data_path, target_size=(512, 512)): density_map_path = os.path.join( data_path, f'gt_density_map_adaptive_{target_size[0]}_{target_size[1]}_object_VarV2' ) if not os.path.isdir(density_map_path): os.makedirs(density_map_path) with open( os.path.join(data_path, 'annotation_FSC147_384.json'), 'rb' ) as file: annotations = json.load(file) device = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu') for i, (image_name, ann) in enumerate(tqdm(annotations.items())): _, h, w = T.ToTensor()(Image.open(os.path.join( data_path, 'images_384_VarV2', image_name ))).size() h_ratio, w_ratio = target_size[0] / h, target_size[1] / w points = ( torch.tensor(ann['points'], device=device) * torch.tensor([w_ratio, h_ratio], device=device) ).long() points[:, 0] = points[:, 0].clip(0, target_size[1] - 1) points[:, 1] = points[:, 1].clip(0, target_size[0] - 1) bboxes = box_convert(torch.tensor( ann['box_examples_coordinates'], dtype=torch.float32, device=device )[:3, [0, 2], :].reshape(-1, 4), in_fmt='xyxy', out_fmt='xywh') bboxes = bboxes * torch.tensor([w_ratio, h_ratio, w_ratio, h_ratio], device=device) window_size = bboxes.mean(dim=0)[2:].cpu().numpy()[::-1] dmap = torch.zeros(*target_size) for p in range(points.size(0)): dmap[points[p, 1], points[p, 0]] += 1 dmap = gaussian_filter(dmap.cpu().numpy(), window_size / 8) np.save(os.path.join(density_map_path, os.path.splitext(image_name)[0] + '.npy'), dmap) if __name__ == '__main__': parser = argparse.ArgumentParser("Density map generator", add_help=False) parser.add_argument( '--data_path', default='dpath', type=str ) parser.add_argument('--image_size', default=512, type=int) args = parser.parse_args() generate_density_maps(args.data_path, (args.image_size, args.image_size))