import numpy as np import random import warnings import keras from utils.anchors import ( anchors_for_shape, guess_shapes, compute_locations, compute_interest_sizes, get_sample_region, AnchorParameters ) from utils.config import parse_anchor_parameters from utils.image import ( TransformParameters, adjust_transform_for_image, apply_transform, preprocess_image, resize_image, ) from utils.transform import transform_aabb def gaussian_radius_2(det_size, min_overlap=0.7): height, width = det_size a1 = 1 b1 = (height + width) c1 = width * height * (1 - min_overlap) / (1 + min_overlap) sq1 = np.sqrt(np.clip(b1 ** 2 - 4 * a1 * c1, 0, 1e8)) r1 = (b1 + sq1) / 2 a2 = 4 b2 = 2 * (height + width) c2 = (1 - min_overlap) * width * height sq2 = np.sqrt(np.clip(b2 ** 2 - 4 * a2 * c2, 0, 1e8)) r2 = (b2 + sq2) / 2 a3 = 4 * min_overlap b3 = -2 * min_overlap * (height + width) c3 = (min_overlap - 1) * width * height sq3 = np.sqrt(np.clip(b3 ** 2 - 4 * a3 * c3, 0, 1e8)) r3 = (b3 + sq3) / 2 return np.min(np.r_[r1[None], r2[None], r3[None]], axis=0) def cal_gaussian(l,r,t,b): x = (r-l)//2 y = (b-t)//2 radius = gaussian_radius_2(((b+t)//2+1,(r+l)//2+1)) radius[radius<0] = 0 radius = np.asarray(radius, 'int') sigma = (2 * radius + 1)/6. h = np.exp(-(x * x + y * y) / (2 * sigma * sigma+0.001)) h[h < np.finfo(h.dtype).eps * h.max()] = 0 return h class Generator(keras.utils.Sequence): """ Abstract generator class. """ def __init__( self, # transform_generator=None, # visual_effect_generator=None, batch_size=1, group_method='ratio', # one of 'none', 'random', 'ratio' shuffle_groups=True, # image_min_side=800, # image_max_side=1333, input_size=512, transformations=None, compute_shapes=guess_shapes, compute_locations=compute_locations, compute_interest_sizes=compute_interest_sizes, preprocess_image=preprocess_image, center_sampling_radius=0, anchor_param=None, config=None ): """ Initialize Generator object. Args transform_generator: A generator used to randomly transform images and annotations. batch_size: The size of the batches to generate. group_method: Determines how images are grouped together (defaults to 'ratio', one of ('none', 'random', 'ratio')). shuffle_groups: If True, shuffles the groups each epoch. image_min_side: After resizing the minimum side of an image is equal to image_min_side. image_max_side: If after resizing the maximum side is larger than image_max_side, scales down further so that the max side is equal to image_max_side. transform_parameters: The transform parameters used for data augmentation. compute_shapes: Function handler for computing the shapes of the pyramid for a given input. compute_locations: Function handler for computing center point of grid cells in all feature map compute_interest_sizes: Function handler for computing size limit for each location preprocess_image: Function handler for preprocessing an image (scaling / normalizing) for passing through a network. """ # self.transform_generator = transform_generator # self.visual_effect_generator = visual_effect_generator self.batch_size = int(batch_size) self.group_method = group_method self.shuffle_groups = shuffle_groups # self.image_min_side = image_min_side # self.image_max_side = image_max_side # self.image_min_sides = (600, 700, 800, 900, 1000) # self.image_max_sides = (1000, 1166, 1333, 1500, 1666) self.input_size = input_size self.transformations = transformations self.compute_shapes = compute_shapes self.compute_locations = compute_locations self.compute_interest_sizes = compute_interest_sizes self.preprocess_image = preprocess_image self.use_mosaic = True self.config = config self.groups = None self.current_index = 0 self.center_sampling_radius = center_sampling_radius self.anchor_param = anchor_param if anchor_param is not None: print('anchor parameters:') print(anchor_param.__dict__) # Define groups self.group_images() # Shuffle when initializing if self.shuffle_groups: random.shuffle(self.groups) def on_epoch_end(self): if self.shuffle_groups: random.shuffle(self.groups) self.current_index = 0 def size(self): """ Size of the dataset. """ raise NotImplementedError('size method not implemented') def num_classes(self): """ Number of classes in the dataset. """ raise NotImplementedError('num_classes method not implemented') def has_label(self, label): """ Returns True if label is a known label. """ raise NotImplementedError('has_label method not implemented') def has_name(self, name): """ Returns True if name is a known class. """ raise NotImplementedError('has_name method not implemented') def name_to_label(self, name): """ Map name to label. """ raise NotImplementedError('name_to_label method not implemented') def label_to_name(self, label): """ Map label to name. """ raise NotImplementedError('label_to_name method not implemented') def image_aspect_ratio(self, image_index): """ Compute the aspect ratio for an image with image_index. """ raise NotImplementedError('image_aspect_ratio method not implemented') def load_image(self, image_index): """ Load an image at the image_index. """ raise NotImplementedError('load_image method not implemented') def load_annotations(self, image_index): """ Load annotations for an image_index. """ raise NotImplementedError('load_annotations method not implemented') def load_annotations_group(self, group): """ Load annotations for all images in group. """ annotations_group = [self.load_annotations(image_index) for image_index in group] for annotations in annotations_group: assert (isinstance(annotations, dict)), '\'load_annotations\' should return a list of dictionaries, received: {}'.format( type(annotations)) assert ( 'labels' in annotations), '\'load_annotations\' should return a list of dictionaries that contain \'labels\' and \'bboxes\'.' assert ( 'bboxes' in annotations), '\'load_annotations\' should return a list of dictionaries that contain \'labels\' and \'bboxes\'.' return annotations_group def filter_annotations(self, image_group, annotations_group, group): """ Filter annotations by removing those that are outside of the image bounds or whose width/height < 0. """ # test all annotations for index, (image, annotations) in enumerate(zip(image_group, annotations_group)): # test x2 < x1 | y2 < y1 | x1 < 0 | y1 < 0 | x2 <= 0 | y2 <= 0 | x2 >= image.shape[1] | y2 >= image.shape[0] image_height = image.shape[0] image_width = image.shape[1] # x1 annotations['bboxes'][:, 0] = np.clip(annotations['bboxes'][:, 0], 0, image_width - 2) # y1 annotations['bboxes'][:, 1] = np.clip(annotations['bboxes'][:, 1], 0, image_height - 2) # x2 annotations['bboxes'][:, 2] = np.clip(annotations['bboxes'][:, 2], 1, image_width - 1) # y2 annotations['bboxes'][:, 3] = np.clip(annotations['bboxes'][:, 3], 1, image_height - 1) invalid_indices = np.where( (annotations['bboxes'][:, 2] <= annotations['bboxes'][:, 0]) | (annotations['bboxes'][:, 3] <= annotations['bboxes'][:, 1]) | #(annotations['bboxes'][:, 0] < 0) | #(annotations['bboxes'][:, 1] < 0) | (annotations['bboxes'][:, 2] <= 0) | (annotations['bboxes'][:, 3] <= 0) #(annotations['bboxes'][:, 2] > image.shape[1]) | #(annotations['bboxes'][:, 3] > image.shape[0]) )[0] # delete invalid indices if len(invalid_indices): #warnings.warn('Image with id {} (shape {}) contains the following invalid boxes: {}.'.format( # group[index], # image.shape, # annotations['bboxes'][invalid_indices, :] #)) for k in annotations_group[index].keys(): annotations_group[index][k] = np.delete(annotations[k], invalid_indices, axis=0) if annotations['bboxes'].shape[0] == 0: pass #warnings.warn('Image with id {} (shape {}) contains no valid boxes before transform'.format( # group[index], # image.shape, #)) return image_group, annotations_group def resize_image(self, image): """ Resize an image using image_min_side and image_max_side. """ # random_side_index = random.randint(0, 4) # return resize_image(image, # min_side=self.image_min_sides[random_side_index], # max_side=self.image_max_sides[random_side_index]) return resize_image(image, min_side=self.input_size, max_side=self.input_size) def clip_transformed_annotations(self, image_group, annotations_group, group): """ Filter annotations by removing those that are outside of the image bounds or whose width/height < 0. """ # test all annotations filtered_image_group = [] filtered_annotations_group = [] for index, (image, annotations) in enumerate(zip(image_group, annotations_group)): image_height = image.shape[0] image_width = image.shape[1] # x1 annotations['bboxes'][:, 0] = np.clip(annotations['bboxes'][:, 0], 0, image_width - 2) # y1 annotations['bboxes'][:, 1] = np.clip(annotations['bboxes'][:, 1], 0, image_height - 2) # x2 annotations['bboxes'][:, 2] = np.clip(annotations['bboxes'][:, 2], 1, image_width - 1) # y2 annotations['bboxes'][:, 3] = np.clip(annotations['bboxes'][:, 3], 1, image_height - 1) # test x2 < x1 | y2 < y1 | x1 < 0 | y1 < 0 | x2 <= 0 | y2 <= 0 | x2 >= image.shape[1] | y2 >= image.shape[0] small_indices = np.where( (annotations['bboxes'][:, 2] - annotations['bboxes'][:, 0] < 9) | (annotations['bboxes'][:, 3] - annotations['bboxes'][:, 1] < 9) )[0] # delete invalid indices if len(small_indices): for k in annotations_group[index].keys(): annotations_group[index][k] = np.delete(annotations[k], small_indices, axis=0) # import cv2 # for invalid_index in small_indices: # x1, y1, x2, y2 = annotations['bboxes'][invalid_index] # label = annotations['labels'][invalid_index] # class_name = self.labels[label] # print('width: {}'.format(x2 - x1)) # print('height: {}'.format(y2 - y1)) # cv2.rectangle(image, (int(round(x1)), int(round(y1))), (int(round(x2)), int(round(y2))), (0, 255, 0), 2) # cv2.putText(image, class_name, (int(round(x1)), int(round(y1))), cv2.FONT_HERSHEY_SIMPLEX, 1.0, (0, 0, 255), 1) # cv2.namedWindow('image', cv2.WINDOW_NORMAL) # cv2.imshow('image', image) # cv2.waitKey(0) if annotations_group[index]['bboxes'].shape[0] != 0: filtered_image_group.append(image) filtered_annotations_group.append(annotations_group[index]) else: pass #warnings.warn('Image with id {} (shape {}) contains no valid boxes after transform'.format( # group[index], # image.shape, #)) return image_group, annotations_group return filtered_image_group, filtered_annotations_group def load_image_group(self, group): """ Load images for all images in a group. """ return [self.load_image(image_index) for image_index in group] def random_transform_group_entry(self, image, annotations): """ Randomly transforms image and annotation. """ # randomly transform both image and annotations if self.transformations is not None: for transform in self.transformations: labels = np.copy(annotations['labels']) labels = np.expand_dims(labels, axis=-1) bboxes = np.copy(annotations['bboxes']) anno = np.concatenate([labels, bboxes], axis=-1) image, anno_n = transform(image, anno) annotations['labels'] = np.asarray([item[0] for item in anno_n]) annotations['bboxes'] = np.reshape(np.asarray([item[1:] for item in anno_n]), (-1, 4)) return image, annotations def random_transform_group(self, image_group, annotations_group): """ Randomly transforms each image and its annotations. """ assert (len(image_group) == len(annotations_group)) for index in range(len(image_group)): # transform a single group entry image_group[index], annotations_group[index] = self.random_transform_group_entry(image_group[index], annotations_group[index]) return image_group, annotations_group def mosaic_group(self, image_group, annotations_group): assert (len(image_group) == len(annotations_group)) for index in range(len(image_group)): # transform a single group entry if self.use_mosaic and random.getrandbits(1): img0 = image_group[index] anno0 = annotations_group[index] group_n = np.random.choice(self.size(), 3) image_group_n = self.load_image_group(group_n) annotations_group_n = self.load_annotations_group(group_n) # image_group_n, annotations_group_n = self.random_transform_group(image_group_n, annotations_group_n) from augmentor.mix import mosaic image_group[index], annotations_group[index] = mosaic([img0]+image_group_n, [anno0]+annotations_group_n, inp_size=self.input_size) return image_group, annotations_group def group_images(self): """ Order the images according to self.order and makes groups of self.batch_size. """ # determine the order of the images order = list(range(self.size())) if self.group_method == 'random': random.shuffle(order) elif self.group_method == 'ratio': order.sort(key=lambda x: self.image_aspect_ratio(x)) # divide into groups, one group = one batch self.groups = [[order[x % len(order)] for x in range(i, i + self.batch_size)] for i in range(0, len(order), self.batch_size)] def compute_inputs(self, image_group): """ Compute inputs for the network using an image_group. """ # get the max image shape max_shape = tuple(max(image.shape[x] for image in image_group) for x in range(3)) max_shape = (self.input_size, self.input_size, 3) # construct an image batch object image_batch = np.zeros((len(image_group),) + max_shape, dtype=keras.backend.floatx()) # copy all images to the upper left part of the image batch object for image_index, image in enumerate(image_group): image = self.preprocess_image(image) image_batch[image_index, :image.shape[0], :image.shape[1], :image.shape[2]] = image return image_batch # def generate_anchors(self, image_shape): # anchor_params = None # if self.config and 'anchor_parameters' in self.config: # anchor_params = parse_anchor_parameters(self.config) # return anchors_for_shape(image_shape, anchor_params=anchor_params, shapes_callback=self.compute_shapes) def compute_targets(self, image_group, annotations_group): """ Compute target outputs for the network using images and their annotations. """ INF = 1e8 assert (len(image_group) == len( annotations_group)), "The length of the images and annotations need to be equal." assert (len(annotations_group) > 0), "No data received to compute anchor targets for." for annotations in annotations_group: assert ('bboxes' in annotations), "Annotations should contain bboxes." assert ('labels' in annotations), "Annotations should contain labels." # get the max image shape max_shape = tuple(max(image.shape[x] for image in image_group) for x in range(3)) feature_shapes = self.compute_shapes(max_shape, pyramid_levels=(3, 4, 5, 6, 7)) # list of np.array locations = self.compute_locations(feature_shapes, self.anchor_param) num_locations_each_layer = [location.shape[0] for location in locations] # (m, 2) m=sum(fh*fw) locations = np.concatenate(locations, axis=0) # (m, 2) interest_sizes = self.compute_interest_sizes(num_locations_each_layer, self.anchor_param) batch_size = len(image_group) num_classes = self.num_classes() batch_regression = np.zeros((batch_size, locations.shape[0], 4 + 1 + 1), dtype=keras.backend.floatx()) batch_classification = np.zeros((batch_size, locations.shape[0], num_classes + 1), dtype=keras.backend.floatx()) batch_hm = np.zeros((batch_size, locations.shape[0], num_classes), dtype=keras.backend.floatx()) batch_centerness = np.zeros((batch_size, locations.shape[0], 1 + 1), dtype=keras.backend.floatx()) # (m, ), (m, ) cx, cy = locations[:, 0], locations[:, 1] for batch_item_id, annotations in enumerate(annotations_group): # (n, 4) bboxes = annotations['bboxes'] if bboxes.shape[0] == 0: continue assert bboxes.shape[0] != 0, 'There should be no such annotations going into training' # (n, ) bbox_areas = (bboxes[:, 3] - bboxes[:, 1]) * (bboxes[:, 2] - bboxes[:, 0]) # (n, ) labels = np.asarray(annotations['labels'],'int') # (m, 1) - (1, n) --> (m, n) l = cx[:, None] - bboxes[:, 0][None] t = cy[:, None] - bboxes[:, 1][None] # (1, n) - (m, 1) --> (m, n) r = bboxes[:, 2][None] - cx[:, None] b = bboxes[:, 3][None] - cy[:, None] # (m, n, 4) regr_targets = np.stack([l, t, r, b], axis=2) #bodong hm_targets = cal_gaussian(l, t, r, b) # (m, n) if self.center_sampling_radius > 0: is_in_bbox = get_sample_region( bboxes, self.anchor_param, num_locations_each_layer, cx, cy, radius=self.center_sampling_radius ) else: # (m, n) is_in_bbox = regr_targets.min(axis=2) > 0 # (m, n) max_regr_target = regr_targets.max(axis=2) # limit the regression range for each location # (m, n) is_cared_in_level = (max_regr_target >= interest_sizes[:, 0:1]) & (max_regr_target <= interest_sizes[:, 1:2]) locations_to_gt_areas = np.tile(bbox_areas[None], (len(locations), 1)) locations_to_gt_areas[~is_in_bbox] = INF locations_to_gt_areas[~is_cared_in_level] = INF # if there are still more than one objects for a location, # we choose the one with minimal area locations_to_min_area = locations_to_gt_areas.min(axis=1) pos_location_indices = np.where(locations_to_min_area != INF)[0] if len(pos_location_indices) == 0: pass # warnings.warn('no pos locations') # print(annotations) locations_to_min_area_ind = locations_to_gt_areas.argmin(axis=1) # (m, 4) regr_targets = regr_targets[range(len(locations)), locations_to_min_area_ind] # (m, 2) left_right = regr_targets[:, [0, 2]] top_bottom = regr_targets[:, [1, 3]] # heat map hm_targets[~is_cared_in_level] = 0 hm_cls_targets = np.tile(hm_targets[:, None, :], (1, num_classes, 1)) for i in range(num_classes): hm_cls_targets[:, i, labels != i] = 0 hm_max_targets = np.max(hm_cls_targets, axis=-1) # (m, ) centerness = (left_right.min(axis=-1) / left_right.max(axis=-1)) * \ (top_bottom.min(axis=-1) / top_bottom.max(axis=-1)) centerness_targets = np.sqrt(np.abs(centerness)) # (m, ) location_labels = labels[locations_to_min_area_ind] pos_location_labels = location_labels[pos_location_indices] batch_regression[batch_item_id, :, :4] = regr_targets batch_regression[batch_item_id, :, 4] = centerness_targets batch_regression[batch_item_id, pos_location_indices, -1] = 1 batch_classification[batch_item_id, pos_location_indices, pos_location_labels] = 1 batch_classification[batch_item_id, pos_location_indices, -1] = 1 batch_centerness[batch_item_id, :, 0] = centerness_targets batch_centerness[batch_item_id, pos_location_indices, -1] = 1 batch_hm[batch_item_id,:,:] = hm_max_targets return [batch_regression, batch_classification, batch_centerness] # return [batch_regression, batch_hm, batch_centerness] # return [locations, batch_regression, batch_classification, batch_centerness] def compute_input_output(self, group): """ Compute inputs and target outputs for the network. """ # load images and annotations # list image_group = self.load_image_group(group) annotations_group = self.load_annotations_group(group) if self.use_mosaic: image_group, annotations_group = self.mosaic_group(image_group, annotations_group) # randomly transform data image_group, annotations_group = self.random_transform_group(image_group, annotations_group) # check validity of annotations image_group, annotations_group = self.clip_transformed_annotations(image_group, annotations_group, group) if len(image_group) == 0: return None, None # # perform preprocessing steps # image_group, annotations_group = self.preprocess_group(image_group, annotations_group) # compute network inputs inputs = self.compute_inputs(image_group) # compute network targets targets = self.compute_targets(image_group, annotations_group) return inputs, targets def compute_input_output_test(self, group): """ Compute inputs and target outputs for the network. """ # load images and annotations # list image_group = self.load_image_group(group) annotations_group = self.load_annotations_group(group) # check validity of annotations image_group, annotations_group = self.filter_annotations(image_group, annotations_group, group) # randomly apply visual effect # image_group, annotations_group = self.random_visual_effect_group(image_group, annotations_group) # randomly transform data # image_group, annotations_group = self.random_transform_group(image_group, annotations_group) # perform preprocessing steps # image_group, annotations_group = self.preprocess_group(image_group, annotations_group) # compute network inputs inputs = self.compute_inputs(image_group) # compute network targets targets = self.compute_targets(image_group, annotations_group) return image_group, annotations_group, targets def update_size(self,input_size): self.input_size = input_size self.transformations[-1].out_height = input_size self.transformations[-1].out_width = input_size def __len__(self): """ Number of batches for generator. """ return len(self.groups) def __getitem__(self, index): """ Keras sequence method for generating batches. """ group = self.groups[self.current_index] inputs, targets = self.compute_input_output(group) while inputs is None: current_index = self.current_index + 1 if current_index >= len(self.groups): current_index = current_index % (len(self.groups)) self.current_index = current_index group = self.groups[self.current_index] inputs, targets = self.compute_input_output(group) current_index = self.current_index + 1 if current_index >= len(self.groups): current_index = current_index % (len(self.groups)) self.current_index = current_index return inputs, targets # image_group, annotation_group, targets = self.compute_input_output_test(group) # return image_group, annotation_group, targets