GECO2-demo / utils /data.py
jerpelhan's picture
Initial commit
6146368
import argparse
import json
import os
import numpy as np
import torch
from PIL import Image
from pycocotools.coco import COCO
from scipy.ndimage import gaussian_filter
from torch.utils.data import Dataset
from torchvision import transforms as T
from torchvision.ops import box_convert
from torchvision.transforms import functional as TVF
from tqdm import tqdm
from torch.nn.utils.rnn import pad_sequence
def tiling_augmentation(img, bboxes, resize, jitter, tile_size, hflip_p, gt_bboxes=None, density_map=None):
def apply_hflip(tensor, apply):
return TVF.hflip(tensor) if apply else tensor
def make_tile(x, num_tiles, jitter=None):
result = list()
for j in range(num_tiles):
row = list()
for k in range(num_tiles):
t = jitter(x) if jitter is not None else x
row.append(t)
result.append(torch.cat(row, dim=-1))
return torch.cat(result, dim=-2)
x_tile, y_tile = tile_size
y_target, x_target = resize.size
num_tiles = max(int(x_tile.ceil()), int(y_tile.ceil()))
img = make_tile(img, num_tiles, jitter=jitter)
c, h, w = img.shape
img = resize(img)
if density_map is not None:
density_map = make_tile(density_map, num_tiles, jitter=jitter)
density_map = density_map
original_sum = density_map.sum()
density_map = resize(density_map)
density_map = density_map / density_map.sum() * original_sum
bboxes = bboxes / torch.tensor([w, h, w, h]) * resize.size[0]
if gt_bboxes is not None:
gt_bboxes_ = gt_bboxes / torch.tensor([w, h, w, h]) * resize.size[0]
gt_bboxes_tiled = torch.cat([gt_bboxes_,
gt_bboxes_ + torch.tensor([0, y_target // 2, 0, y_target // 2]),
gt_bboxes_ + torch.tensor([x_target // 2, 0, x_target // 2, 0]),
gt_bboxes_ + torch.tensor(
[x_target // 2, y_target // 2, x_target // 2, y_target // 2])])
return img, bboxes, density_map, gt_bboxes_tiled
return img, bboxes, density_map
def xywh_to_x1y1x2y2(xywh):
x, y, w, h = xywh
x1 = x
y1 = y
x2 = x + w
y2 = y + h
return [x1, y1, x2, y2]
def pad_collate(batch):
(img, bboxes, density_map, image_names, gt_bboxes) = zip(*batch)
gt_bboxes_pad = pad_sequence(gt_bboxes, batch_first=True, padding_value=0)
img = torch.stack(img)
bboxes = torch.stack(bboxes)
image_names = torch.stack(image_names)
gt_bboxes = gt_bboxes_pad
density_map = torch.stack(density_map)
return img, bboxes, density_map, image_names, gt_bboxes
def pad_collate_test(batch):
(img, bboxes, density_map, ids, gt_bboxes, scaling_factor, padwh) = zip(*batch)
gt_bboxes_pad = pad_sequence(gt_bboxes, batch_first=True, padding_value=0)
img = torch.stack(img)
bboxes = torch.stack(bboxes)
density_map = torch.stack(density_map)
ids = torch.stack(ids)
scaling_factor = torch.tensor(scaling_factor)
padwh = torch.tensor(padwh)
return img, bboxes, density_map, ids, gt_bboxes_pad, scaling_factor, padwh
class FSC147DATASET(Dataset):
def __init__(
self, data_path, img_size, split='train', num_objects=3,
tiling_p=0.5, zero_shot=False, return_ids=False, training=False
):
self.split = split
self.data_path = data_path
self.horizontal_flip_p = 0.5
self.tiling_p = tiling_p
self.img_size = img_size
self.resize = T.Resize((img_size, img_size), antialias=True)
self.resize512 = T.Resize((512, 512), antialias=True)
self.jitter = T.RandomApply([T.ColorJitter(0.4, 0.4, 0.4, 0.1)], p=0.8)
self.num_objects = num_objects
self.zero_shot = zero_shot
self.return_ids = return_ids
self.training = training
with open(
os.path.join(self.data_path, 'annotations', 'Train_Test_Val_FSC_147.json'), 'rb'
) as file:
splits = json.load(file)
self.image_names = splits[split]
with open(
os.path.join(self.data_path, 'annotations', 'annotation_FSC147_384.json'), 'rb'
) as file:
self.annotations = json.load(file)
self.labels = COCO(os.path.join(self.data_path, 'annotations', 'instances_' + split + '.json'))
self.img_name_to_ori_id = self.map_img_name_to_ori_id()
def get_gt_bboxes(self, idx):
coco_im_id = self.img_name_to_ori_id[self.image_names[idx]]
anno_ids = self.labels.getAnnIds([coco_im_id])
annotations = self.labels.loadAnns(anno_ids)
bboxes = []
for a in annotations:
bboxes.append(xywh_to_x1y1x2y2(a['bbox']))
return bboxes
def __getitem__(self, idx: int):
img = Image.open(os.path.join(
self.data_path,
'images_384_VarV2',
self.image_names[idx]
)).convert("RGB")
w, h = img.size
gt_bboxes = torch.tensor(self.get_gt_bboxes(idx))
# fig, ax = plt.subplots(1)
# # Display the image
# ax.imshow(img)
# # Plot each bounding box
# for bbox in gt_bboxes:
# x, y, width, height = bbox
# rect = patches.Rectangle(
# (x, y), width - x, height - y,
# linewidth=0.8, edgecolor='r', facecolor='none'
# )
# ax.add_patch(rect)
#
# plt.savefig(os.path.join("/storage/datasets/fsc147/plot/",self.image_names[idx]))
# plt.close()
img = T.Compose([
T.ToTensor(),
])(img)
bboxes = torch.tensor(
self.annotations[self.image_names[idx]]['box_examples_coordinates'],
dtype=torch.float32
)[:3, [0, 2], :].reshape(-1, 4)[:self.num_objects, ...]
# take the bbox with largest area bboxes are in xyxy format
# width = bboxes[:, 2] - bboxes[:, 0]
# height = bboxes[:, 3] - bboxes[:, 1]
# area = width * height
# bboxes = bboxes[area.argsort()]
# bboxes = bboxes[0].unsqueeze(0)
density_map = torch.from_numpy(np.load(os.path.join(
self.data_path,
'gt_density_map_adaptive_512_512_object_VarV2',
# 'gt_density_map_adaptive_1024_1024_SAME',
os.path.splitext(self.image_names[idx])[0] + '.npy',
))).unsqueeze(0)
if self.split == 'train':
tiled = False
# data augmentation
# if mean of bbox width and height is under a predefined threshold
channels, original_height, original_width = img.shape
longer_dimension = max(original_height, original_width)
scaling_factor = self.img_size / longer_dimension
bboxes_resized = bboxes * torch.tensor([scaling_factor, scaling_factor, scaling_factor, scaling_factor])
if (bboxes_resized[:, 2] - bboxes_resized[:, 0]).mean() > 30 and (
bboxes_resized[:, 3] - bboxes_resized[:, 1]).mean() > 30 and torch.rand(1) < self.tiling_p:
tiled = True
tile_size = (torch.rand(1) + 1, torch.rand(1) + 1)
img, bboxes, density_map, gt_bboxes = tiling_augmentation(
img, bboxes, self.resize,
self.jitter, tile_size, self.horizontal_flip_p, gt_bboxes=gt_bboxes, density_map=density_map
)
else:
img = self.jitter(img)
img, bboxes, density_map, gt_bboxes, scaling_factor, padwh = resize_and_pad(img, bboxes, density_map,
gt_bboxes=gt_bboxes,
train=True)
if not tiled and torch.rand(1) < self.horizontal_flip_p:
img = TVF.hflip(img)
density_map = TVF.hflip(density_map)
bboxes[:, [0, 2]] = self.img_size - bboxes[:, [2, 0]]
gt_bboxes[:, [0, 2]] = self.img_size - gt_bboxes[:, [2, 0]]
else:
# if bboxes (xyxy) are in average > 50 px call this
# width = bboxes[:, 2] - bboxes[:, 0]
# height = bboxes[:, 3] - bboxes[:, 1]
# if width.mean()>50 and height.mean()>50:
img, bboxes, density_map, gt_bboxes, scaling_factor, padwh = tile_multiscale(img, bboxes, density_map,
gt_bboxes=gt_bboxes)
# else:
# return 1, 1, 1, 1, 1, 1, 1
original_sum = density_map.sum()
density_map = self.resize512(density_map)
density_map = density_map / density_map.sum() * original_sum
gt_bboxes = torch.clamp(gt_bboxes, min=0, max=1024)
img = T.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])(img)
# if self.split == 'train' or self.training:
# return img, bboxes, density_map, torch.tensor(idx), gt_bboxes
# else:
return img, bboxes, density_map, torch.tensor(idx), gt_bboxes, torch.tensor(scaling_factor), padwh
def __len__(self):
return len(self.image_names)
def map_img_name_to_ori_id(self, ):
all_coco_imgs = self.labels.imgs
map_name_2_id = dict()
for k, v in all_coco_imgs.items():
img_id = v["id"]
img_name = v["file_name"]
map_name_2_id[img_name] = img_id
return map_name_2_id
class LVISDatasetBOX(Dataset):
def __init__(
self, data_path, img_size, split='train', num_objects=3,
tiling_p=0.5, zero_shot=False, return_ids=False
):
self.split = split
self.data_path = data_path
self.horizontal_flip_p = 0.5
self.tiling_p = tiling_p
self.img_size = img_size
self.resize = T.Resize((img_size, img_size), antialias=True)
self.resize512 = T.Resize((512, 512), antialias=True)
self.jitter = T.RandomApply([T.ColorJitter(0.4, 0.4, 0.4, 0.1)], p=0.8)
self.num_objects = num_objects
self.zero_shot = zero_shot
self.return_ids = return_ids
self.img_path = os.path.join(data_path, "images")
# if split == 'val' or split == 'test':
self.labels = COCO(os.path.join(self.data_path, 'annotations', 'unseen_instances_' + split + '.json'))
self.image_ids = self.labels.getImgIds()
self.count_anno = self.load_json(os.path.join(data_path, "annotations", "unseen_count_" + split + ".json"))
self.img_name_to_ori_id = self.map_img_name_to_ori_id()
def load_json(self, json_file):
with open(json_file, "r") as f:
data = json.load(f)
return data
def __getitem__(self, idx: int):
img_id = self.image_ids[idx]
img_info = self.labels.loadImgs([img_id])[0]
img_file = img_info["file_name"]
img = Image.open(os.path.join(self.img_path, img_file)).convert("RGB")
ann_ids = self.labels.getAnnIds([img_id])
anns = self.labels.loadAnns(ids=ann_ids)
# and change to torch float32
gt_bboxes = [instance["bbox"] for instance in anns]
gt_bboxes = torch.tensor(gt_bboxes, dtype=torch.float32)
# change to x1y1x2y2
gt_bboxes = torch.tensor([xywh_to_x1y1x2y2(bbox) for bbox in gt_bboxes], dtype=torch.float32)
bboxes = self.count_anno["annotations"][idx]["boxes"]
bboxes = torch.tensor([xywh_to_x1y1x2y2(bbox) for bbox in bboxes], dtype=torch.float32)[:3]
img = T.Compose([
T.ToTensor(),
])(img)
density_map = torch.zeros((512,512)).unsqueeze(0)
# data augmentation
tiled = False
if self.split == 'train' and torch.rand(1) < self.tiling_p:
tiled = True
tile_size = (torch.rand(1) + 1, torch.rand(1) + 1)
img, bboxes, gt_bboxes = tiling_augmentation(
img, bboxes, self.resize,
self.jitter, tile_size, self.horizontal_flip_p, gt_bboxes=gt_bboxes
)
else:
img, bboxes, density_map, gt_bboxes, scaling_factor, (pad_width, pad_height) = resize_and_pad(img, bboxes, density_map, gt_bboxes=gt_bboxes)
if self.split == 'train':
if not tiled:
img = self.jitter(img)
img = T.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])(img)
if self.split == 'train' and not tiled and torch.rand(1) < self.horizontal_flip_p:
img = TVF.hflip(img)
density_map = TVF.hflip(density_map)
bboxes[:, [0, 2]] = self.img_size - bboxes[:, [2, 0]]
gt_bboxes[:, [0, 2]] = self.img_size - gt_bboxes[:, [2, 0]]
return img, bboxes, density_map, torch.tensor(img_id), gt_bboxes, scaling_factor, (pad_width, pad_height)
def __len__(self):
return len(self.image_ids)
def map_img_name_to_ori_id(self, ):
all_coco_imgs = self.labels.imgs
map_name_2_id = dict()
for k, v in all_coco_imgs.items():
img_id = v["id"]
img_name = v["file_name"]
map_name_2_id[img_name] = img_id
return map_name_2_id
#MULTISCALE IMAGES
def tile_multiscale(img, bboxes, density_map, gt_bboxes, size=1024.0, zero_shot=False, train=False):
# create image with one large repetition of the original image 512x512,
# the rest is padded with small repetitions of the original image 128x128
resize512 = T.Resize((512, 512), antialias=True)
channels, original_height, original_width = img.shape
longer_dimension = max(original_height, original_width)
scaling_factor = 512 / longer_dimension
scaled_bboxes = bboxes * scaling_factor
resized_img = torch.nn.functional.interpolate(img.unsqueeze(0), scale_factor=scaling_factor, mode='bilinear',
align_corners=False)
size = int(size)
pad_height = max(0, size - resized_img.shape[2])
pad_width = max(0, size - resized_img.shape[3])
padded_img = torch.nn.functional.pad(resized_img, (0, pad_width, 0, pad_height), mode='constant', value=0)[0]
resized_img2 = torch.nn.functional.interpolate(img.unsqueeze(0), scale_factor=scaling_factor / 2, mode='bilinear',
align_corners=False)[0]
w, h = resized_img2.shape[1], resized_img2.shape[2]
# make image of 1024x1024 with repetitions of the resized_img2
padded_img2 = torch.nn.functional.pad(resized_img2, (0, 1024-h, 0, 1024-w), mode='constant', value=0)
for i in range(0, 1024, w):
for j in range(0, 1024, h):
pad_w, pad_h = padded_img2[:, i:i + w, j:j + h].shape[1], padded_img2[:, i:i + w, j:j + h].shape[2]
padded_img2[:, i:i + pad_w, j:j + pad_h] = resized_img2[:,:pad_w, :pad_h]
#
# # overwrite padded_img with resized_img
padded_img2[padded_img != 0] = padded_img[padded_img != 0]
return padded_img, bboxes, density_map, gt_bboxes, 0, (0,0)
def resize_and_pad(img, bboxes, density_map=None, gt_bboxes=None, size=1024.0, zero_shot=False, train=False):
resize512 = T.Resize((512, 512), antialias=True)
channels, original_height, original_width = img.shape
longer_dimension = max(original_height, original_width)
scaling_factor = size / longer_dimension
scaled_bboxes = bboxes * scaling_factor
if not zero_shot and not train:
a_dim = ((scaled_bboxes[:, 2] - scaled_bboxes[:, 0]).mean() + (
scaled_bboxes[:, 3] - scaled_bboxes[:, 1]).mean()) / 2
scaling_factor = min(1.0, 80 / a_dim.item()) * scaling_factor
resized_img = torch.nn.functional.interpolate(img.unsqueeze(0), scale_factor=scaling_factor, mode='bilinear',
align_corners=False)
size = int(size)
pad_height = max(0, size - resized_img.shape[2])
pad_width = max(0, size - resized_img.shape[3])
padded_img = torch.nn.functional.pad(resized_img, (0, pad_width, 0, pad_height), mode='constant', value=0)[0]
if density_map is not None:
original_sum = density_map.sum()
_, w0, h0 = density_map.shape
_, W, H = img.shape
resized_density_map = torch.nn.functional.interpolate(density_map.unsqueeze(0), size=(W, H), mode='bilinear',
align_corners=False)
resized_density_map = torch.nn.functional.interpolate(resized_density_map, scale_factor=scaling_factor,
mode='bilinear',
align_corners=False)
padded_density_map = \
torch.nn.functional.pad(resized_density_map, (0, pad_width, 0, pad_height), mode='constant', value=0)[0]
padded_density_map = resize512(padded_density_map)
padded_density_map = padded_density_map / padded_density_map.sum() * original_sum
bboxes = bboxes * torch.tensor([scaling_factor, scaling_factor, scaling_factor, scaling_factor]).to(bboxes.device)
if gt_bboxes is None and density_map is None:
return padded_img, bboxes, scaling_factor
gt_bboxes = gt_bboxes * torch.tensor([scaling_factor, scaling_factor, scaling_factor, scaling_factor])
return padded_img, bboxes, padded_density_map, gt_bboxes, scaling_factor, (pad_width, pad_height)
import json
import logging
import os
import random
import numpy as np
import torchvision.transforms.functional as trans_F
import torchvision.transforms as T
from einops import rearrange
from PIL import Image, ImageFile
import torch
from torch.nn.utils.rnn import pad_sequence
from torch.utils.data import (DataLoader, Dataset, RandomSampler,
SequentialSampler)
from torchvision import transforms
from torch.nn.utils.rnn import pad_sequence
def pad_collate_mcac(batch):
(img, bboxes, image_names, gt_bboxes) = zip(*batch)
gt_bboxes_pad = pad_sequence(gt_bboxes, batch_first=True, padding_value=0)
img = torch.stack(img)
bboxes = torch.stack(bboxes)
image_names = torch.stack(image_names)
gt_bboxes = gt_bboxes_pad
return img, bboxes, image_names, gt_bboxes
IM_NORM_MEAN = [0.485, 0.456, 0.406]
IM_NORM_STD = [0.229, 0.224, 0.225]
Normalize_tensor = transforms.Compose(
[transforms.Normalize(mean=IM_NORM_MEAN, std=IM_NORM_STD)]
)
def denormalize(tensor, means=IM_NORM_MEAN, stds=IM_NORM_STD, clip_0_1=True):
with torch.no_grad():
denormalized = tensor.clone()
for channel, mean, std in zip(denormalized, means, stds):
channel.mul_(std).add_(mean)
if clip_0_1:
channel[channel < 0] = 0
channel[channel > 1] = 1
return denormalized
class MCAC_Dataset(Dataset):
def __init__(self, data_path,
image_size,
split='train',
num_objects=3,
tiling_p=0.5,
zero_shot=False,
training=True
):
ImageFile.LOAD_TRUNCATED_IMAGES = True
self.img_size = (image_size, image_size)
self.img_channels = 3
self.split = split
self.training = training
if split != 'train':
# load json with exemplars
with open(f"{data_path}/{self.split}_eval_bboxes.json", "r") as f:
self.exemplars = json.load(f)
self.im_dir = f"{data_path}/{self.split}"
CFG = dict()
CFG["MCAC_occ_limit"] = 70
CFG["MCAC_occ_limit_exemplar"] = 30
CFG["MCAC_crop_size"] = 672
self.gs_file = f"_c_8"
self.gs_file += "_occ_" + str(int(CFG["MCAC_occ_limit"])) if CFG["MCAC_occ_limit"] != -1 else ""
self.gs_file += "_non_int"
self.gs_file += f"_crop{CFG['MCAC_crop_size']}" if CFG["MCAC_crop_size"] != -1 else ""
self.gs_file += "_np"
self.im_ids = [
f for f in os.listdir(self.im_dir) if os.path.isdir(self.im_dir + "/" + f)
]
self.CFG = CFG
self.toten = transforms.ToTensor()
self.resize_im = transforms.Resize((self.img_size[0], self.img_size[0]))
self.bboxes_str = "bboxes_crop672"
self.centers_str = "centers"
self.occlusions_str = "occlusions_crop672"
self.area_str = "area"
self.json_p = f"info_with_occ_bbox.json"
# CFG["MCAC_exclude_imgs_with_num_classes_over"] = 1
# self.exlude_images_num_class()
print(
f"{self.split} set, size:{len(self.im_ids)}")
def __len__(self):
return len(self.im_ids)
def __getitem__(self, idx):
im_id = self.im_ids[idx]
image = Image.open(f"{self.im_dir}/{im_id}/img.png")
image.load()
if image.mode != "RGB":
image = image.convert("RGB")
image = self.toten(image)
if self.CFG["MCAC_crop_size"] != -1:
crop_boundary_size_0 = int(
(image.shape[1] - self.CFG["MCAC_crop_size"]) / 2
)
crop_boundary_size_1 = int(
(image.shape[2] - self.CFG["MCAC_crop_size"]) / 2
)
image = image[
:,
crop_boundary_size_0:-crop_boundary_size_0,
crop_boundary_size_1:-crop_boundary_size_1,
]
with open(f"{self.im_dir}/{im_id}/{self.json_p}", "r") as f:
img_info = json.load(f)
if self.split == 'train' and self.training:
# choose random int from 0 to img_info["countables"] length, and get the corresponding bbox
chosen_class = random.randint(0, len(img_info["countables"]) - 1)
# exemplar_bboxes should be 3 randomly selected from img_info["countables"][chosen_class]
occlusions = torch.tensor(img_info["countables"][chosen_class][self.occlusions_str])
all_bboxes = torch.tensor(img_info["countables"][chosen_class][self.bboxes_str], dtype=torch.float32)
all_bboxes[:, :, 0] = all_bboxes[:, :, 0] / (image.shape[1] / self.img_size[0])
all_bboxes[:, :, 1] = all_bboxes[:, :, 1] / (image.shape[2] / self.img_size[1])
all_bboxes = torch.clip(
all_bboxes, 0, self.img_size[0] - 1
)
all_bboxes = all_bboxes.reshape(-1, 4)
all_bboxes = torch.stack(
(all_bboxes[:, 2], all_bboxes[:, 0], all_bboxes[:, 3], all_bboxes[:, 1]),
axis=1,
)
gt_bboxes = all_bboxes[occlusions < self.CFG["MCAC_occ_limit"]]
exemplar_candidates = all_bboxes[occlusions < self.CFG["MCAC_occ_limit_exemplar"]]
if len(exemplar_candidates) < 3:
# sort exemplar_candidates by occlusions -- the less occlusions come first
exemplar_candidates = all_bboxes[occlusions.argsort()][:3]
exemplar_ids = torch.randperm(exemplar_candidates.shape[0])[:3]
exemplar_bboxes = exemplar_candidates[exemplar_ids]
image = self.resize_im(image)
image = T.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])(image)
return (
image,
exemplar_bboxes,
torch.tensor(idx),
gt_bboxes
)
bboxes = []
e_bboxes = []
for c_i, c in enumerate(img_info["countables"]):
occlusions = torch.tensor(img_info["countables"][c_i][self.occlusions_str])
all_bboxes = torch.tensor(img_info["countables"][c_i][self.bboxes_str], dtype=torch.float32)
all_bboxes[:, :, 0] = all_bboxes[:, :, 0] / (image.shape[1] / self.img_size[0])
all_bboxes[:, :, 1] = all_bboxes[:, :, 1] / (image.shape[2] / self.img_size[1])
all_bboxes = torch.clip(
all_bboxes, 0, self.img_size[0] - 1
)
all_bboxes = all_bboxes.reshape(-1, 4)
all_bboxes = torch.stack(
(all_bboxes[:, 2], all_bboxes[:, 0], all_bboxes[:, 3], all_bboxes[:, 1]),
axis=1,
)
gt_bboxes = all_bboxes[occlusions < self.CFG["MCAC_occ_limit"]]
if self.split == 'train':
exemplar_bboxes = all_bboxes[occlusions < self.CFG["MCAC_occ_limit_exemplar"]]
if len(exemplar_bboxes) < 3:
# sort exemplar_candidates by occlusions -- the less occlusions come first
exemplar_bboxes = all_bboxes[occlusions.argsort()][:3]
else:
assert self.exemplars[im_id][c_i]['obj_id'] == c['obj_id']
orig_exemplar_idx = torch.tensor(self.exemplars[im_id][c_i]['eval_bbox_inds'])
# all_bbox_idx = torch.tensor(c['inds'])
# mask = torch.isin(all_bbox_idx, orig_exemplar_idx)
# indices = torch.nonzero(mask, as_tuple=True)[0]
exemplar_bboxes = all_bboxes[orig_exemplar_idx]
bboxes.append(gt_bboxes)
e_bboxes.append(exemplar_bboxes)
image = self.resize_im(image)
bboxes = pad_sequence(bboxes, batch_first=True, padding_value=0)
e_bboxes = pad_sequence(e_bboxes, batch_first=True, padding_value=0)
image = T.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])(image)
return (
image,
e_bboxes,
torch.tensor(idx),
bboxes
)
def exlude_images_num_class(self):
new_im_ids = []
for id in self.im_ids:
with open(f"{self.im_dir}/{id}/{self.json_p}", "r") as f:
img_info = json.load(f)
num_countables = 0
for c in img_info["countables"]:
if self.CFG["MCAC_occ_limit"] != -1:
assert len(c[self.occlusions_str]) == len(c["inds"])
cnt_np = np.array(c[self.occlusions_str])
inds = cnt_np < self.CFG["MCAC_occ_limit"]
cnt_np = cnt_np[inds]
cnt = len(cnt_np)
else:
cnt = len(c["inds"])
if cnt >= 1:
num_countables += 1
if (
num_countables
<= self.CFG["MCAC_exclude_imgs_with_num_classes_over"]
):
new_im_ids.append(id)
print(
f"EXCLUDING OVER LIMIT: {self.CFG['MCAC_exclude_imgs_with_num_classes_over']} class, from:{len(self.im_ids)} to {len(new_im_ids)}"
)
self.im_ids = new_im_ids
def exlude_images_counts(self):
new_im_ids = []
all_counts = []
for id in self.im_ids:
with open(f"{self.im_dir}/{id}/{self.json_p}", "r") as f:
img_info = json.load(f)
include = True
for c in img_info["countables"]:
if self.CFG["MCAC_occ_limit"] != -1:
assert len(c[self.occlusions_str]) == len(c["inds"])
cnt_np = np.array(c[self.occlusions_str])
inds = cnt_np < self.CFG["MCAC_occ_limit"]
cnt_np = cnt_np[inds]
cnt = len(cnt_np)
else:
cnt = len(c["inds"])
if cnt != 0:
all_counts.append(cnt)
if cnt > self.CFG["MCAC_exclude_imgs_with_counts_over"]:
include = False
if include:
new_im_ids.append(id)
print(
f"EXCLUDING OVER LIMIT: {self.CFG['MCAC_exclude_imgs_with_counts_over']} count, from:{len(self.im_ids)} to {len(new_im_ids)}"
)
self.im_ids = new_im_ids
def ref_rot(self, image, dots, rects, density):
if random.random() > 0.5:
image = trans_F.hflip(image)
density = trans_F.hflip(density)
dots = self.hflip_dots(dots)
rects = self.hflip_bboxes(rects)
if random.random() > 0.5:
image = trans_F.vflip(image)
density = trans_F.vflip(density)
dots = self.vflip_dots(dots)
rects = self.vflip_bboxes(rects)
rotate_angle = int(random.random() * 4)
if rotate_angle != 0:
image = trans_F.rotate(image, rotate_angle * 90)
density = trans_F.rotate(density, rotate_angle * 90)
for _i in range(rotate_angle):
dots = self.rotate_dots_90(dots)
rects = self.rotate_bboxes_90(rects)
return image, dots, rects, density
def rotate_bboxes_90(self, rects):
none_rects = rects == -1
new_x_rects = rects[:, :, 0]
new_y_rects = (self.img_size[1] - 1) - rects[:, :, 1]
rects = np.stack((new_y_rects, new_x_rects), axis=-2)
rects[none_rects] = -1
return rects
def rotate_dots_90(self, dots):
none_dots = dots == -1
new_x = dots[:, :, 1]
new_y = (self.img_size[1] - 1) - dots[:, :, 0]
dots = np.stack((new_x, new_y), axis=-1)
dots[none_dots] = -1
return dots
def vflip_bboxes(self, rects):
none_rects = rects == -1
rects[:, :, 0] = (self.img_size[1] - 1) - rects[:, :, 0]
rects[none_rects] = -1
return rects
def vflip_dots(self, dots):
none_dots = dots == -1
dots[:, :, 1] = (self.img_size[1] - 1) - dots[:, :, 1]
dots[none_dots] = -1
return dots
def hflip_bboxes(self, rects):
none_rects = rects == -1
rects[:, :, 1] = (self.img_size[0] - 1) - rects[:, :, 1]
rects[none_rects] = -1
return rects
def hflip_dots(self, dots):
none_dots = dots == -1
dots[:, :, 0] = (self.img_size[0] - 1) - dots[:, :, 0]
dots[none_dots] = -1
return dots
def get_loader_counting(CFG):
test_loader = get_dataloader(CFG, train=False)
train_loader = get_dataloader(CFG, train=True)
return train_loader, test_loader
def get_dataloader(CFG, train):
if CFG["dataset"] == "MCAC" or CFG["dataset"] == "MCAC-M1":
dataset = MCAC_Dataset(CFG, train=train)
if train:
bs = CFG["train_batch_size"]
sampler = RandomSampler(dataset)
else:
bs = CFG["eval_batch_size"]
sampler = SequentialSampler(dataset)
loader = DataLoader(
dataset,
sampler=sampler,
batch_size=bs,
num_workers=CFG["num_workers"],
pin_memory=True,
drop_last=CFG["drop_last"],
)
return loader
def generate_density_maps(data_path, target_size=(512, 512)):
density_map_path = os.path.join(
data_path,
f'gt_density_map_adaptive_{target_size[0]}_{target_size[1]}_object_VarV2'
)
if not os.path.isdir(density_map_path):
os.makedirs(density_map_path)
with open(
os.path.join(data_path, 'annotation_FSC147_384.json'), 'rb'
) as file:
annotations = json.load(file)
device = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu')
for i, (image_name, ann) in enumerate(tqdm(annotations.items())):
_, h, w = T.ToTensor()(Image.open(os.path.join(
data_path,
'images_384_VarV2',
image_name
))).size()
h_ratio, w_ratio = target_size[0] / h, target_size[1] / w
points = (
torch.tensor(ann['points'], device=device) *
torch.tensor([w_ratio, h_ratio], device=device)
).long()
points[:, 0] = points[:, 0].clip(0, target_size[1] - 1)
points[:, 1] = points[:, 1].clip(0, target_size[0] - 1)
bboxes = box_convert(torch.tensor(
ann['box_examples_coordinates'],
dtype=torch.float32,
device=device
)[:3, [0, 2], :].reshape(-1, 4), in_fmt='xyxy', out_fmt='xywh')
bboxes = bboxes * torch.tensor([w_ratio, h_ratio, w_ratio, h_ratio], device=device)
window_size = bboxes.mean(dim=0)[2:].cpu().numpy()[::-1]
dmap = torch.zeros(*target_size)
for p in range(points.size(0)):
dmap[points[p, 1], points[p, 0]] += 1
dmap = gaussian_filter(dmap.cpu().numpy(), window_size / 8)
np.save(os.path.join(density_map_path, os.path.splitext(image_name)[0] + '.npy'), dmap)
if __name__ == '__main__':
parser = argparse.ArgumentParser("Density map generator", add_help=False)
parser.add_argument(
'--data_path',
default='dpath',
type=str
)
parser.add_argument('--image_size', default=512, type=int)
args = parser.parse_args()
generate_density_maps(args.data_path, (args.image_size, args.image_size))