Spaces:
Running
on
Zero
Running
on
Zero
| import argparse | |
| import json | |
| import os | |
| import numpy as np | |
| import torch | |
| from PIL import Image | |
| from pycocotools.coco import COCO | |
| from scipy.ndimage import gaussian_filter | |
| from torch.utils.data import Dataset | |
| from torchvision import transforms as T | |
| from torchvision.ops import box_convert | |
| from torchvision.transforms import functional as TVF | |
| from tqdm import tqdm | |
| from torch.nn.utils.rnn import pad_sequence | |
| def tiling_augmentation(img, bboxes, resize, jitter, tile_size, hflip_p, gt_bboxes=None, density_map=None): | |
| def apply_hflip(tensor, apply): | |
| return TVF.hflip(tensor) if apply else tensor | |
| def make_tile(x, num_tiles, jitter=None): | |
| result = list() | |
| for j in range(num_tiles): | |
| row = list() | |
| for k in range(num_tiles): | |
| t = jitter(x) if jitter is not None else x | |
| row.append(t) | |
| result.append(torch.cat(row, dim=-1)) | |
| return torch.cat(result, dim=-2) | |
| x_tile, y_tile = tile_size | |
| y_target, x_target = resize.size | |
| num_tiles = max(int(x_tile.ceil()), int(y_tile.ceil())) | |
| img = make_tile(img, num_tiles, jitter=jitter) | |
| c, h, w = img.shape | |
| img = resize(img) | |
| if density_map is not None: | |
| density_map = make_tile(density_map, num_tiles, jitter=jitter) | |
| density_map = density_map | |
| original_sum = density_map.sum() | |
| density_map = resize(density_map) | |
| density_map = density_map / density_map.sum() * original_sum | |
| bboxes = bboxes / torch.tensor([w, h, w, h]) * resize.size[0] | |
| if gt_bboxes is not None: | |
| gt_bboxes_ = gt_bboxes / torch.tensor([w, h, w, h]) * resize.size[0] | |
| gt_bboxes_tiled = torch.cat([gt_bboxes_, | |
| gt_bboxes_ + torch.tensor([0, y_target // 2, 0, y_target // 2]), | |
| gt_bboxes_ + torch.tensor([x_target // 2, 0, x_target // 2, 0]), | |
| gt_bboxes_ + torch.tensor( | |
| [x_target // 2, y_target // 2, x_target // 2, y_target // 2])]) | |
| return img, bboxes, density_map, gt_bboxes_tiled | |
| return img, bboxes, density_map | |
| def xywh_to_x1y1x2y2(xywh): | |
| x, y, w, h = xywh | |
| x1 = x | |
| y1 = y | |
| x2 = x + w | |
| y2 = y + h | |
| return [x1, y1, x2, y2] | |
| def pad_collate(batch): | |
| (img, bboxes, density_map, image_names, gt_bboxes) = zip(*batch) | |
| gt_bboxes_pad = pad_sequence(gt_bboxes, batch_first=True, padding_value=0) | |
| img = torch.stack(img) | |
| bboxes = torch.stack(bboxes) | |
| image_names = torch.stack(image_names) | |
| gt_bboxes = gt_bboxes_pad | |
| density_map = torch.stack(density_map) | |
| return img, bboxes, density_map, image_names, gt_bboxes | |
| def pad_collate_test(batch): | |
| (img, bboxes, density_map, ids, gt_bboxes, scaling_factor, padwh) = zip(*batch) | |
| gt_bboxes_pad = pad_sequence(gt_bboxes, batch_first=True, padding_value=0) | |
| img = torch.stack(img) | |
| bboxes = torch.stack(bboxes) | |
| density_map = torch.stack(density_map) | |
| ids = torch.stack(ids) | |
| scaling_factor = torch.tensor(scaling_factor) | |
| padwh = torch.tensor(padwh) | |
| return img, bboxes, density_map, ids, gt_bboxes_pad, scaling_factor, padwh | |
| class FSC147DATASET(Dataset): | |
| def __init__( | |
| self, data_path, img_size, split='train', num_objects=3, | |
| tiling_p=0.5, zero_shot=False, return_ids=False, training=False | |
| ): | |
| self.split = split | |
| self.data_path = data_path | |
| self.horizontal_flip_p = 0.5 | |
| self.tiling_p = tiling_p | |
| self.img_size = img_size | |
| self.resize = T.Resize((img_size, img_size), antialias=True) | |
| self.resize512 = T.Resize((512, 512), antialias=True) | |
| self.jitter = T.RandomApply([T.ColorJitter(0.4, 0.4, 0.4, 0.1)], p=0.8) | |
| self.num_objects = num_objects | |
| self.zero_shot = zero_shot | |
| self.return_ids = return_ids | |
| self.training = training | |
| with open( | |
| os.path.join(self.data_path, 'annotations', 'Train_Test_Val_FSC_147.json'), 'rb' | |
| ) as file: | |
| splits = json.load(file) | |
| self.image_names = splits[split] | |
| with open( | |
| os.path.join(self.data_path, 'annotations', 'annotation_FSC147_384.json'), 'rb' | |
| ) as file: | |
| self.annotations = json.load(file) | |
| self.labels = COCO(os.path.join(self.data_path, 'annotations', 'instances_' + split + '.json')) | |
| self.img_name_to_ori_id = self.map_img_name_to_ori_id() | |
| def get_gt_bboxes(self, idx): | |
| coco_im_id = self.img_name_to_ori_id[self.image_names[idx]] | |
| anno_ids = self.labels.getAnnIds([coco_im_id]) | |
| annotations = self.labels.loadAnns(anno_ids) | |
| bboxes = [] | |
| for a in annotations: | |
| bboxes.append(xywh_to_x1y1x2y2(a['bbox'])) | |
| return bboxes | |
| def __getitem__(self, idx: int): | |
| img = Image.open(os.path.join( | |
| self.data_path, | |
| 'images_384_VarV2', | |
| self.image_names[idx] | |
| )).convert("RGB") | |
| w, h = img.size | |
| gt_bboxes = torch.tensor(self.get_gt_bboxes(idx)) | |
| # fig, ax = plt.subplots(1) | |
| # # Display the image | |
| # ax.imshow(img) | |
| # # Plot each bounding box | |
| # for bbox in gt_bboxes: | |
| # x, y, width, height = bbox | |
| # rect = patches.Rectangle( | |
| # (x, y), width - x, height - y, | |
| # linewidth=0.8, edgecolor='r', facecolor='none' | |
| # ) | |
| # ax.add_patch(rect) | |
| # | |
| # plt.savefig(os.path.join("/storage/datasets/fsc147/plot/",self.image_names[idx])) | |
| # plt.close() | |
| img = T.Compose([ | |
| T.ToTensor(), | |
| ])(img) | |
| bboxes = torch.tensor( | |
| self.annotations[self.image_names[idx]]['box_examples_coordinates'], | |
| dtype=torch.float32 | |
| )[:3, [0, 2], :].reshape(-1, 4)[:self.num_objects, ...] | |
| # take the bbox with largest area bboxes are in xyxy format | |
| # width = bboxes[:, 2] - bboxes[:, 0] | |
| # height = bboxes[:, 3] - bboxes[:, 1] | |
| # area = width * height | |
| # bboxes = bboxes[area.argsort()] | |
| # bboxes = bboxes[0].unsqueeze(0) | |
| density_map = torch.from_numpy(np.load(os.path.join( | |
| self.data_path, | |
| 'gt_density_map_adaptive_512_512_object_VarV2', | |
| # 'gt_density_map_adaptive_1024_1024_SAME', | |
| os.path.splitext(self.image_names[idx])[0] + '.npy', | |
| ))).unsqueeze(0) | |
| if self.split == 'train': | |
| tiled = False | |
| # data augmentation | |
| # if mean of bbox width and height is under a predefined threshold | |
| channels, original_height, original_width = img.shape | |
| longer_dimension = max(original_height, original_width) | |
| scaling_factor = self.img_size / longer_dimension | |
| bboxes_resized = bboxes * torch.tensor([scaling_factor, scaling_factor, scaling_factor, scaling_factor]) | |
| if (bboxes_resized[:, 2] - bboxes_resized[:, 0]).mean() > 30 and ( | |
| bboxes_resized[:, 3] - bboxes_resized[:, 1]).mean() > 30 and torch.rand(1) < self.tiling_p: | |
| tiled = True | |
| tile_size = (torch.rand(1) + 1, torch.rand(1) + 1) | |
| img, bboxes, density_map, gt_bboxes = tiling_augmentation( | |
| img, bboxes, self.resize, | |
| self.jitter, tile_size, self.horizontal_flip_p, gt_bboxes=gt_bboxes, density_map=density_map | |
| ) | |
| else: | |
| img = self.jitter(img) | |
| img, bboxes, density_map, gt_bboxes, scaling_factor, padwh = resize_and_pad(img, bboxes, density_map, | |
| gt_bboxes=gt_bboxes, | |
| train=True) | |
| if not tiled and torch.rand(1) < self.horizontal_flip_p: | |
| img = TVF.hflip(img) | |
| density_map = TVF.hflip(density_map) | |
| bboxes[:, [0, 2]] = self.img_size - bboxes[:, [2, 0]] | |
| gt_bboxes[:, [0, 2]] = self.img_size - gt_bboxes[:, [2, 0]] | |
| else: | |
| # if bboxes (xyxy) are in average > 50 px call this | |
| # width = bboxes[:, 2] - bboxes[:, 0] | |
| # height = bboxes[:, 3] - bboxes[:, 1] | |
| # if width.mean()>50 and height.mean()>50: | |
| img, bboxes, density_map, gt_bboxes, scaling_factor, padwh = tile_multiscale(img, bboxes, density_map, | |
| gt_bboxes=gt_bboxes) | |
| # else: | |
| # return 1, 1, 1, 1, 1, 1, 1 | |
| original_sum = density_map.sum() | |
| density_map = self.resize512(density_map) | |
| density_map = density_map / density_map.sum() * original_sum | |
| gt_bboxes = torch.clamp(gt_bboxes, min=0, max=1024) | |
| img = T.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])(img) | |
| # if self.split == 'train' or self.training: | |
| # return img, bboxes, density_map, torch.tensor(idx), gt_bboxes | |
| # else: | |
| return img, bboxes, density_map, torch.tensor(idx), gt_bboxes, torch.tensor(scaling_factor), padwh | |
| def __len__(self): | |
| return len(self.image_names) | |
| def map_img_name_to_ori_id(self, ): | |
| all_coco_imgs = self.labels.imgs | |
| map_name_2_id = dict() | |
| for k, v in all_coco_imgs.items(): | |
| img_id = v["id"] | |
| img_name = v["file_name"] | |
| map_name_2_id[img_name] = img_id | |
| return map_name_2_id | |
| class LVISDatasetBOX(Dataset): | |
| def __init__( | |
| self, data_path, img_size, split='train', num_objects=3, | |
| tiling_p=0.5, zero_shot=False, return_ids=False | |
| ): | |
| self.split = split | |
| self.data_path = data_path | |
| self.horizontal_flip_p = 0.5 | |
| self.tiling_p = tiling_p | |
| self.img_size = img_size | |
| self.resize = T.Resize((img_size, img_size), antialias=True) | |
| self.resize512 = T.Resize((512, 512), antialias=True) | |
| self.jitter = T.RandomApply([T.ColorJitter(0.4, 0.4, 0.4, 0.1)], p=0.8) | |
| self.num_objects = num_objects | |
| self.zero_shot = zero_shot | |
| self.return_ids = return_ids | |
| self.img_path = os.path.join(data_path, "images") | |
| # if split == 'val' or split == 'test': | |
| self.labels = COCO(os.path.join(self.data_path, 'annotations', 'unseen_instances_' + split + '.json')) | |
| self.image_ids = self.labels.getImgIds() | |
| self.count_anno = self.load_json(os.path.join(data_path, "annotations", "unseen_count_" + split + ".json")) | |
| self.img_name_to_ori_id = self.map_img_name_to_ori_id() | |
| def load_json(self, json_file): | |
| with open(json_file, "r") as f: | |
| data = json.load(f) | |
| return data | |
| def __getitem__(self, idx: int): | |
| img_id = self.image_ids[idx] | |
| img_info = self.labels.loadImgs([img_id])[0] | |
| img_file = img_info["file_name"] | |
| img = Image.open(os.path.join(self.img_path, img_file)).convert("RGB") | |
| ann_ids = self.labels.getAnnIds([img_id]) | |
| anns = self.labels.loadAnns(ids=ann_ids) | |
| # and change to torch float32 | |
| gt_bboxes = [instance["bbox"] for instance in anns] | |
| gt_bboxes = torch.tensor(gt_bboxes, dtype=torch.float32) | |
| # change to x1y1x2y2 | |
| gt_bboxes = torch.tensor([xywh_to_x1y1x2y2(bbox) for bbox in gt_bboxes], dtype=torch.float32) | |
| bboxes = self.count_anno["annotations"][idx]["boxes"] | |
| bboxes = torch.tensor([xywh_to_x1y1x2y2(bbox) for bbox in bboxes], dtype=torch.float32)[:3] | |
| img = T.Compose([ | |
| T.ToTensor(), | |
| ])(img) | |
| density_map = torch.zeros((512,512)).unsqueeze(0) | |
| # data augmentation | |
| tiled = False | |
| if self.split == 'train' and torch.rand(1) < self.tiling_p: | |
| tiled = True | |
| tile_size = (torch.rand(1) + 1, torch.rand(1) + 1) | |
| img, bboxes, gt_bboxes = tiling_augmentation( | |
| img, bboxes, self.resize, | |
| self.jitter, tile_size, self.horizontal_flip_p, gt_bboxes=gt_bboxes | |
| ) | |
| else: | |
| img, bboxes, density_map, gt_bboxes, scaling_factor, (pad_width, pad_height) = resize_and_pad(img, bboxes, density_map, gt_bboxes=gt_bboxes) | |
| if self.split == 'train': | |
| if not tiled: | |
| img = self.jitter(img) | |
| img = T.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])(img) | |
| if self.split == 'train' and not tiled and torch.rand(1) < self.horizontal_flip_p: | |
| img = TVF.hflip(img) | |
| density_map = TVF.hflip(density_map) | |
| bboxes[:, [0, 2]] = self.img_size - bboxes[:, [2, 0]] | |
| gt_bboxes[:, [0, 2]] = self.img_size - gt_bboxes[:, [2, 0]] | |
| return img, bboxes, density_map, torch.tensor(img_id), gt_bboxes, scaling_factor, (pad_width, pad_height) | |
| def __len__(self): | |
| return len(self.image_ids) | |
| def map_img_name_to_ori_id(self, ): | |
| all_coco_imgs = self.labels.imgs | |
| map_name_2_id = dict() | |
| for k, v in all_coco_imgs.items(): | |
| img_id = v["id"] | |
| img_name = v["file_name"] | |
| map_name_2_id[img_name] = img_id | |
| return map_name_2_id | |
| #MULTISCALE IMAGES | |
| def tile_multiscale(img, bboxes, density_map, gt_bboxes, size=1024.0, zero_shot=False, train=False): | |
| # create image with one large repetition of the original image 512x512, | |
| # the rest is padded with small repetitions of the original image 128x128 | |
| resize512 = T.Resize((512, 512), antialias=True) | |
| channels, original_height, original_width = img.shape | |
| longer_dimension = max(original_height, original_width) | |
| scaling_factor = 512 / longer_dimension | |
| scaled_bboxes = bboxes * scaling_factor | |
| resized_img = torch.nn.functional.interpolate(img.unsqueeze(0), scale_factor=scaling_factor, mode='bilinear', | |
| align_corners=False) | |
| size = int(size) | |
| pad_height = max(0, size - resized_img.shape[2]) | |
| pad_width = max(0, size - resized_img.shape[3]) | |
| padded_img = torch.nn.functional.pad(resized_img, (0, pad_width, 0, pad_height), mode='constant', value=0)[0] | |
| resized_img2 = torch.nn.functional.interpolate(img.unsqueeze(0), scale_factor=scaling_factor / 2, mode='bilinear', | |
| align_corners=False)[0] | |
| w, h = resized_img2.shape[1], resized_img2.shape[2] | |
| # make image of 1024x1024 with repetitions of the resized_img2 | |
| padded_img2 = torch.nn.functional.pad(resized_img2, (0, 1024-h, 0, 1024-w), mode='constant', value=0) | |
| for i in range(0, 1024, w): | |
| for j in range(0, 1024, h): | |
| pad_w, pad_h = padded_img2[:, i:i + w, j:j + h].shape[1], padded_img2[:, i:i + w, j:j + h].shape[2] | |
| padded_img2[:, i:i + pad_w, j:j + pad_h] = resized_img2[:,:pad_w, :pad_h] | |
| # | |
| # # overwrite padded_img with resized_img | |
| padded_img2[padded_img != 0] = padded_img[padded_img != 0] | |
| return padded_img, bboxes, density_map, gt_bboxes, 0, (0,0) | |
| def resize_and_pad(img, bboxes, density_map=None, gt_bboxes=None, size=1024.0, zero_shot=False, train=False): | |
| resize512 = T.Resize((512, 512), antialias=True) | |
| channels, original_height, original_width = img.shape | |
| longer_dimension = max(original_height, original_width) | |
| scaling_factor = size / longer_dimension | |
| scaled_bboxes = bboxes * scaling_factor | |
| if not zero_shot and not train: | |
| a_dim = ((scaled_bboxes[:, 2] - scaled_bboxes[:, 0]).mean() + ( | |
| scaled_bboxes[:, 3] - scaled_bboxes[:, 1]).mean()) / 2 | |
| scaling_factor = min(1.0, 80 / a_dim.item()) * scaling_factor | |
| resized_img = torch.nn.functional.interpolate(img.unsqueeze(0), scale_factor=scaling_factor, mode='bilinear', | |
| align_corners=False) | |
| size = int(size) | |
| pad_height = max(0, size - resized_img.shape[2]) | |
| pad_width = max(0, size - resized_img.shape[3]) | |
| padded_img = torch.nn.functional.pad(resized_img, (0, pad_width, 0, pad_height), mode='constant', value=0)[0] | |
| if density_map is not None: | |
| original_sum = density_map.sum() | |
| _, w0, h0 = density_map.shape | |
| _, W, H = img.shape | |
| resized_density_map = torch.nn.functional.interpolate(density_map.unsqueeze(0), size=(W, H), mode='bilinear', | |
| align_corners=False) | |
| resized_density_map = torch.nn.functional.interpolate(resized_density_map, scale_factor=scaling_factor, | |
| mode='bilinear', | |
| align_corners=False) | |
| padded_density_map = \ | |
| torch.nn.functional.pad(resized_density_map, (0, pad_width, 0, pad_height), mode='constant', value=0)[0] | |
| padded_density_map = resize512(padded_density_map) | |
| padded_density_map = padded_density_map / padded_density_map.sum() * original_sum | |
| bboxes = bboxes * torch.tensor([scaling_factor, scaling_factor, scaling_factor, scaling_factor]).to(bboxes.device) | |
| if gt_bboxes is None and density_map is None: | |
| return padded_img, bboxes, scaling_factor | |
| gt_bboxes = gt_bboxes * torch.tensor([scaling_factor, scaling_factor, scaling_factor, scaling_factor]) | |
| return padded_img, bboxes, padded_density_map, gt_bboxes, scaling_factor, (pad_width, pad_height) | |
| import json | |
| import logging | |
| import os | |
| import random | |
| import numpy as np | |
| import torchvision.transforms.functional as trans_F | |
| import torchvision.transforms as T | |
| from einops import rearrange | |
| from PIL import Image, ImageFile | |
| import torch | |
| from torch.nn.utils.rnn import pad_sequence | |
| from torch.utils.data import (DataLoader, Dataset, RandomSampler, | |
| SequentialSampler) | |
| from torchvision import transforms | |
| from torch.nn.utils.rnn import pad_sequence | |
| def pad_collate_mcac(batch): | |
| (img, bboxes, image_names, gt_bboxes) = zip(*batch) | |
| gt_bboxes_pad = pad_sequence(gt_bboxes, batch_first=True, padding_value=0) | |
| img = torch.stack(img) | |
| bboxes = torch.stack(bboxes) | |
| image_names = torch.stack(image_names) | |
| gt_bboxes = gt_bboxes_pad | |
| return img, bboxes, image_names, gt_bboxes | |
| IM_NORM_MEAN = [0.485, 0.456, 0.406] | |
| IM_NORM_STD = [0.229, 0.224, 0.225] | |
| Normalize_tensor = transforms.Compose( | |
| [transforms.Normalize(mean=IM_NORM_MEAN, std=IM_NORM_STD)] | |
| ) | |
| def denormalize(tensor, means=IM_NORM_MEAN, stds=IM_NORM_STD, clip_0_1=True): | |
| with torch.no_grad(): | |
| denormalized = tensor.clone() | |
| for channel, mean, std in zip(denormalized, means, stds): | |
| channel.mul_(std).add_(mean) | |
| if clip_0_1: | |
| channel[channel < 0] = 0 | |
| channel[channel > 1] = 1 | |
| return denormalized | |
| class MCAC_Dataset(Dataset): | |
| def __init__(self, data_path, | |
| image_size, | |
| split='train', | |
| num_objects=3, | |
| tiling_p=0.5, | |
| zero_shot=False, | |
| training=True | |
| ): | |
| ImageFile.LOAD_TRUNCATED_IMAGES = True | |
| self.img_size = (image_size, image_size) | |
| self.img_channels = 3 | |
| self.split = split | |
| self.training = training | |
| if split != 'train': | |
| # load json with exemplars | |
| with open(f"{data_path}/{self.split}_eval_bboxes.json", "r") as f: | |
| self.exemplars = json.load(f) | |
| self.im_dir = f"{data_path}/{self.split}" | |
| CFG = dict() | |
| CFG["MCAC_occ_limit"] = 70 | |
| CFG["MCAC_occ_limit_exemplar"] = 30 | |
| CFG["MCAC_crop_size"] = 672 | |
| self.gs_file = f"_c_8" | |
| self.gs_file += "_occ_" + str(int(CFG["MCAC_occ_limit"])) if CFG["MCAC_occ_limit"] != -1 else "" | |
| self.gs_file += "_non_int" | |
| self.gs_file += f"_crop{CFG['MCAC_crop_size']}" if CFG["MCAC_crop_size"] != -1 else "" | |
| self.gs_file += "_np" | |
| self.im_ids = [ | |
| f for f in os.listdir(self.im_dir) if os.path.isdir(self.im_dir + "/" + f) | |
| ] | |
| self.CFG = CFG | |
| self.toten = transforms.ToTensor() | |
| self.resize_im = transforms.Resize((self.img_size[0], self.img_size[0])) | |
| self.bboxes_str = "bboxes_crop672" | |
| self.centers_str = "centers" | |
| self.occlusions_str = "occlusions_crop672" | |
| self.area_str = "area" | |
| self.json_p = f"info_with_occ_bbox.json" | |
| # CFG["MCAC_exclude_imgs_with_num_classes_over"] = 1 | |
| # self.exlude_images_num_class() | |
| print( | |
| f"{self.split} set, size:{len(self.im_ids)}") | |
| def __len__(self): | |
| return len(self.im_ids) | |
| def __getitem__(self, idx): | |
| im_id = self.im_ids[idx] | |
| image = Image.open(f"{self.im_dir}/{im_id}/img.png") | |
| image.load() | |
| if image.mode != "RGB": | |
| image = image.convert("RGB") | |
| image = self.toten(image) | |
| if self.CFG["MCAC_crop_size"] != -1: | |
| crop_boundary_size_0 = int( | |
| (image.shape[1] - self.CFG["MCAC_crop_size"]) / 2 | |
| ) | |
| crop_boundary_size_1 = int( | |
| (image.shape[2] - self.CFG["MCAC_crop_size"]) / 2 | |
| ) | |
| image = image[ | |
| :, | |
| crop_boundary_size_0:-crop_boundary_size_0, | |
| crop_boundary_size_1:-crop_boundary_size_1, | |
| ] | |
| with open(f"{self.im_dir}/{im_id}/{self.json_p}", "r") as f: | |
| img_info = json.load(f) | |
| if self.split == 'train' and self.training: | |
| # choose random int from 0 to img_info["countables"] length, and get the corresponding bbox | |
| chosen_class = random.randint(0, len(img_info["countables"]) - 1) | |
| # exemplar_bboxes should be 3 randomly selected from img_info["countables"][chosen_class] | |
| occlusions = torch.tensor(img_info["countables"][chosen_class][self.occlusions_str]) | |
| all_bboxes = torch.tensor(img_info["countables"][chosen_class][self.bboxes_str], dtype=torch.float32) | |
| all_bboxes[:, :, 0] = all_bboxes[:, :, 0] / (image.shape[1] / self.img_size[0]) | |
| all_bboxes[:, :, 1] = all_bboxes[:, :, 1] / (image.shape[2] / self.img_size[1]) | |
| all_bboxes = torch.clip( | |
| all_bboxes, 0, self.img_size[0] - 1 | |
| ) | |
| all_bboxes = all_bboxes.reshape(-1, 4) | |
| all_bboxes = torch.stack( | |
| (all_bboxes[:, 2], all_bboxes[:, 0], all_bboxes[:, 3], all_bboxes[:, 1]), | |
| axis=1, | |
| ) | |
| gt_bboxes = all_bboxes[occlusions < self.CFG["MCAC_occ_limit"]] | |
| exemplar_candidates = all_bboxes[occlusions < self.CFG["MCAC_occ_limit_exemplar"]] | |
| if len(exemplar_candidates) < 3: | |
| # sort exemplar_candidates by occlusions -- the less occlusions come first | |
| exemplar_candidates = all_bboxes[occlusions.argsort()][:3] | |
| exemplar_ids = torch.randperm(exemplar_candidates.shape[0])[:3] | |
| exemplar_bboxes = exemplar_candidates[exemplar_ids] | |
| image = self.resize_im(image) | |
| image = T.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])(image) | |
| return ( | |
| image, | |
| exemplar_bboxes, | |
| torch.tensor(idx), | |
| gt_bboxes | |
| ) | |
| bboxes = [] | |
| e_bboxes = [] | |
| for c_i, c in enumerate(img_info["countables"]): | |
| occlusions = torch.tensor(img_info["countables"][c_i][self.occlusions_str]) | |
| all_bboxes = torch.tensor(img_info["countables"][c_i][self.bboxes_str], dtype=torch.float32) | |
| all_bboxes[:, :, 0] = all_bboxes[:, :, 0] / (image.shape[1] / self.img_size[0]) | |
| all_bboxes[:, :, 1] = all_bboxes[:, :, 1] / (image.shape[2] / self.img_size[1]) | |
| all_bboxes = torch.clip( | |
| all_bboxes, 0, self.img_size[0] - 1 | |
| ) | |
| all_bboxes = all_bboxes.reshape(-1, 4) | |
| all_bboxes = torch.stack( | |
| (all_bboxes[:, 2], all_bboxes[:, 0], all_bboxes[:, 3], all_bboxes[:, 1]), | |
| axis=1, | |
| ) | |
| gt_bboxes = all_bboxes[occlusions < self.CFG["MCAC_occ_limit"]] | |
| if self.split == 'train': | |
| exemplar_bboxes = all_bboxes[occlusions < self.CFG["MCAC_occ_limit_exemplar"]] | |
| if len(exemplar_bboxes) < 3: | |
| # sort exemplar_candidates by occlusions -- the less occlusions come first | |
| exemplar_bboxes = all_bboxes[occlusions.argsort()][:3] | |
| else: | |
| assert self.exemplars[im_id][c_i]['obj_id'] == c['obj_id'] | |
| orig_exemplar_idx = torch.tensor(self.exemplars[im_id][c_i]['eval_bbox_inds']) | |
| # all_bbox_idx = torch.tensor(c['inds']) | |
| # mask = torch.isin(all_bbox_idx, orig_exemplar_idx) | |
| # indices = torch.nonzero(mask, as_tuple=True)[0] | |
| exemplar_bboxes = all_bboxes[orig_exemplar_idx] | |
| bboxes.append(gt_bboxes) | |
| e_bboxes.append(exemplar_bboxes) | |
| image = self.resize_im(image) | |
| bboxes = pad_sequence(bboxes, batch_first=True, padding_value=0) | |
| e_bboxes = pad_sequence(e_bboxes, batch_first=True, padding_value=0) | |
| image = T.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])(image) | |
| return ( | |
| image, | |
| e_bboxes, | |
| torch.tensor(idx), | |
| bboxes | |
| ) | |
| def exlude_images_num_class(self): | |
| new_im_ids = [] | |
| for id in self.im_ids: | |
| with open(f"{self.im_dir}/{id}/{self.json_p}", "r") as f: | |
| img_info = json.load(f) | |
| num_countables = 0 | |
| for c in img_info["countables"]: | |
| if self.CFG["MCAC_occ_limit"] != -1: | |
| assert len(c[self.occlusions_str]) == len(c["inds"]) | |
| cnt_np = np.array(c[self.occlusions_str]) | |
| inds = cnt_np < self.CFG["MCAC_occ_limit"] | |
| cnt_np = cnt_np[inds] | |
| cnt = len(cnt_np) | |
| else: | |
| cnt = len(c["inds"]) | |
| if cnt >= 1: | |
| num_countables += 1 | |
| if ( | |
| num_countables | |
| <= self.CFG["MCAC_exclude_imgs_with_num_classes_over"] | |
| ): | |
| new_im_ids.append(id) | |
| print( | |
| f"EXCLUDING OVER LIMIT: {self.CFG['MCAC_exclude_imgs_with_num_classes_over']} class, from:{len(self.im_ids)} to {len(new_im_ids)}" | |
| ) | |
| self.im_ids = new_im_ids | |
| def exlude_images_counts(self): | |
| new_im_ids = [] | |
| all_counts = [] | |
| for id in self.im_ids: | |
| with open(f"{self.im_dir}/{id}/{self.json_p}", "r") as f: | |
| img_info = json.load(f) | |
| include = True | |
| for c in img_info["countables"]: | |
| if self.CFG["MCAC_occ_limit"] != -1: | |
| assert len(c[self.occlusions_str]) == len(c["inds"]) | |
| cnt_np = np.array(c[self.occlusions_str]) | |
| inds = cnt_np < self.CFG["MCAC_occ_limit"] | |
| cnt_np = cnt_np[inds] | |
| cnt = len(cnt_np) | |
| else: | |
| cnt = len(c["inds"]) | |
| if cnt != 0: | |
| all_counts.append(cnt) | |
| if cnt > self.CFG["MCAC_exclude_imgs_with_counts_over"]: | |
| include = False | |
| if include: | |
| new_im_ids.append(id) | |
| print( | |
| f"EXCLUDING OVER LIMIT: {self.CFG['MCAC_exclude_imgs_with_counts_over']} count, from:{len(self.im_ids)} to {len(new_im_ids)}" | |
| ) | |
| self.im_ids = new_im_ids | |
| def ref_rot(self, image, dots, rects, density): | |
| if random.random() > 0.5: | |
| image = trans_F.hflip(image) | |
| density = trans_F.hflip(density) | |
| dots = self.hflip_dots(dots) | |
| rects = self.hflip_bboxes(rects) | |
| if random.random() > 0.5: | |
| image = trans_F.vflip(image) | |
| density = trans_F.vflip(density) | |
| dots = self.vflip_dots(dots) | |
| rects = self.vflip_bboxes(rects) | |
| rotate_angle = int(random.random() * 4) | |
| if rotate_angle != 0: | |
| image = trans_F.rotate(image, rotate_angle * 90) | |
| density = trans_F.rotate(density, rotate_angle * 90) | |
| for _i in range(rotate_angle): | |
| dots = self.rotate_dots_90(dots) | |
| rects = self.rotate_bboxes_90(rects) | |
| return image, dots, rects, density | |
| def rotate_bboxes_90(self, rects): | |
| none_rects = rects == -1 | |
| new_x_rects = rects[:, :, 0] | |
| new_y_rects = (self.img_size[1] - 1) - rects[:, :, 1] | |
| rects = np.stack((new_y_rects, new_x_rects), axis=-2) | |
| rects[none_rects] = -1 | |
| return rects | |
| def rotate_dots_90(self, dots): | |
| none_dots = dots == -1 | |
| new_x = dots[:, :, 1] | |
| new_y = (self.img_size[1] - 1) - dots[:, :, 0] | |
| dots = np.stack((new_x, new_y), axis=-1) | |
| dots[none_dots] = -1 | |
| return dots | |
| def vflip_bboxes(self, rects): | |
| none_rects = rects == -1 | |
| rects[:, :, 0] = (self.img_size[1] - 1) - rects[:, :, 0] | |
| rects[none_rects] = -1 | |
| return rects | |
| def vflip_dots(self, dots): | |
| none_dots = dots == -1 | |
| dots[:, :, 1] = (self.img_size[1] - 1) - dots[:, :, 1] | |
| dots[none_dots] = -1 | |
| return dots | |
| def hflip_bboxes(self, rects): | |
| none_rects = rects == -1 | |
| rects[:, :, 1] = (self.img_size[0] - 1) - rects[:, :, 1] | |
| rects[none_rects] = -1 | |
| return rects | |
| def hflip_dots(self, dots): | |
| none_dots = dots == -1 | |
| dots[:, :, 0] = (self.img_size[0] - 1) - dots[:, :, 0] | |
| dots[none_dots] = -1 | |
| return dots | |
| def get_loader_counting(CFG): | |
| test_loader = get_dataloader(CFG, train=False) | |
| train_loader = get_dataloader(CFG, train=True) | |
| return train_loader, test_loader | |
| def get_dataloader(CFG, train): | |
| if CFG["dataset"] == "MCAC" or CFG["dataset"] == "MCAC-M1": | |
| dataset = MCAC_Dataset(CFG, train=train) | |
| if train: | |
| bs = CFG["train_batch_size"] | |
| sampler = RandomSampler(dataset) | |
| else: | |
| bs = CFG["eval_batch_size"] | |
| sampler = SequentialSampler(dataset) | |
| loader = DataLoader( | |
| dataset, | |
| sampler=sampler, | |
| batch_size=bs, | |
| num_workers=CFG["num_workers"], | |
| pin_memory=True, | |
| drop_last=CFG["drop_last"], | |
| ) | |
| return loader | |
| def generate_density_maps(data_path, target_size=(512, 512)): | |
| density_map_path = os.path.join( | |
| data_path, | |
| f'gt_density_map_adaptive_{target_size[0]}_{target_size[1]}_object_VarV2' | |
| ) | |
| if not os.path.isdir(density_map_path): | |
| os.makedirs(density_map_path) | |
| with open( | |
| os.path.join(data_path, 'annotation_FSC147_384.json'), 'rb' | |
| ) as file: | |
| annotations = json.load(file) | |
| device = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu') | |
| for i, (image_name, ann) in enumerate(tqdm(annotations.items())): | |
| _, h, w = T.ToTensor()(Image.open(os.path.join( | |
| data_path, | |
| 'images_384_VarV2', | |
| image_name | |
| ))).size() | |
| h_ratio, w_ratio = target_size[0] / h, target_size[1] / w | |
| points = ( | |
| torch.tensor(ann['points'], device=device) * | |
| torch.tensor([w_ratio, h_ratio], device=device) | |
| ).long() | |
| points[:, 0] = points[:, 0].clip(0, target_size[1] - 1) | |
| points[:, 1] = points[:, 1].clip(0, target_size[0] - 1) | |
| bboxes = box_convert(torch.tensor( | |
| ann['box_examples_coordinates'], | |
| dtype=torch.float32, | |
| device=device | |
| )[:3, [0, 2], :].reshape(-1, 4), in_fmt='xyxy', out_fmt='xywh') | |
| bboxes = bboxes * torch.tensor([w_ratio, h_ratio, w_ratio, h_ratio], device=device) | |
| window_size = bboxes.mean(dim=0)[2:].cpu().numpy()[::-1] | |
| dmap = torch.zeros(*target_size) | |
| for p in range(points.size(0)): | |
| dmap[points[p, 1], points[p, 0]] += 1 | |
| dmap = gaussian_filter(dmap.cpu().numpy(), window_size / 8) | |
| np.save(os.path.join(density_map_path, os.path.splitext(image_name)[0] + '.npy'), dmap) | |
| if __name__ == '__main__': | |
| parser = argparse.ArgumentParser("Density map generator", add_help=False) | |
| parser.add_argument( | |
| '--data_path', | |
| default='dpath', | |
| type=str | |
| ) | |
| parser.add_argument('--image_size', default=512, type=int) | |
| args = parser.parse_args() | |
| generate_density_maps(args.data_path, (args.image_size, args.image_size)) | |