Spaces:
Runtime error
Runtime error
| import os | |
| import random | |
| import numpy as np | |
| import pandas as pd | |
| import pytorch_lightning as pl | |
| import torch | |
| from PIL import Image, ImageFile | |
| from torch.utils.data import DataLoader, Dataset | |
| from torchvision.transforms import Resize | |
| from . import config, transforms | |
| from .utils import cells_to_bboxes | |
| from .utils import iou_width_height as iou | |
| from .utils import non_max_suppression as nms | |
| from .utils import plot_image, xyxy2xywhn, xywhn2xyxy | |
| ImageFile.LOAD_TRUNCATED_IMAGES = True | |
| class YOLODataset(Dataset): | |
| def __init__( | |
| self, | |
| csv_file, | |
| img_dir, | |
| label_dir, | |
| anchors, | |
| image_size=416, | |
| S=[13, 26, 52], | |
| C=20, | |
| transform=None, | |
| mosaic_percentage=0.67, | |
| ): | |
| self.annotations = pd.read_csv(csv_file) | |
| self.img_dir = img_dir | |
| self.label_dir = label_dir | |
| self.image_size = image_size | |
| self.mosaic_border = [image_size // 2, image_size // 2] | |
| self.transform = transform | |
| self.S = S | |
| self.anchors = torch.tensor( | |
| anchors[0] + anchors[1] + anchors[2] | |
| ) # for all 3 scales | |
| self.num_anchors = self.anchors.shape[0] | |
| self.num_anchors_per_scale = self.num_anchors // 3 | |
| self.C = C | |
| self.ignore_iou_thresh = 0.5 | |
| self.mosaic_percentage = mosaic_percentage | |
| def __len__(self): | |
| return len(self.annotations) | |
| def load_mosaic(self, index): | |
| # YOLOv5 4-mosaic loader. Loads 1 image + 3 random images into a 4-image mosaic | |
| labels4 = [] | |
| s = self.image_size | |
| yc, xc = ( | |
| int(random.uniform(x, 2 * s - x)) for x in self.mosaic_border | |
| ) # mosaic center x, y | |
| indices = [index] + random.choices( | |
| range(len(self)), k=3 | |
| ) # 3 additional image indices | |
| random.shuffle(indices) | |
| for i, index in enumerate(indices): | |
| # Load image | |
| label_path = os.path.join(self.label_dir, self.annotations.iloc[index, 1]) | |
| bboxes = np.roll( | |
| np.loadtxt(fname=label_path, delimiter=" ", ndmin=2), 4, axis=1 | |
| ).tolist() | |
| img_path = os.path.join(self.img_dir, self.annotations.iloc[index, 0]) | |
| img = np.array(Image.open(img_path).convert("RGB")) | |
| h, w = img.shape[0], img.shape[1] | |
| labels = np.array(bboxes) | |
| # place img in img4 | |
| if i == 0: # top left | |
| img4 = np.full( | |
| (s * 2, s * 2, img.shape[2]), 114, dtype=np.uint8 | |
| ) # base image with 4 tiles | |
| x1a, y1a, x2a, y2a = ( | |
| max(xc - w, 0), | |
| max(yc - h, 0), | |
| xc, | |
| yc, | |
| ) # xmin, ymin, xmax, ymax (large image) | |
| x1b, y1b, x2b, y2b = ( | |
| w - (x2a - x1a), | |
| h - (y2a - y1a), | |
| w, | |
| h, | |
| ) # xmin, ymin, xmax, ymax (small image) | |
| elif i == 1: # top right | |
| x1a, y1a, x2a, y2a = xc, max(yc - h, 0), min(xc + w, s * 2), yc | |
| x1b, y1b, x2b, y2b = 0, h - (y2a - y1a), min(w, x2a - x1a), h | |
| elif i == 2: # bottom left | |
| x1a, y1a, x2a, y2a = max(xc - w, 0), yc, xc, min(s * 2, yc + h) | |
| x1b, y1b, x2b, y2b = w - (x2a - x1a), 0, w, min(y2a - y1a, h) | |
| elif i == 3: # bottom right | |
| x1a, y1a, x2a, y2a = xc, yc, min(xc + w, s * 2), min(s * 2, yc + h) | |
| x1b, y1b, x2b, y2b = 0, 0, min(w, x2a - x1a), min(y2a - y1a, h) | |
| img4[y1a:y2a, x1a:x2a] = img[y1b:y2b, x1b:x2b] # img4[ymin:ymax, xmin:xmax] | |
| padw = x1a - x1b | |
| padh = y1a - y1b | |
| # Labels | |
| if labels.size: | |
| labels[:, :-1] = xywhn2xyxy( | |
| labels[:, :-1], w, h, padw, padh | |
| ) # normalized xywh to pixel xyxy format | |
| labels4.append(labels) | |
| # Concat/clip labels | |
| labels4 = np.concatenate(labels4, 0) | |
| for x in (labels4[:, :-1],): | |
| np.clip(x, 0, 2 * s, out=x) # clip when using random_perspective() | |
| # img4, labels4 = replicate(img4, labels4) # replicate | |
| labels4[:, :-1] = xyxy2xywhn(labels4[:, :-1], 2 * s, 2 * s) | |
| labels4[:, :-1] = np.clip(labels4[:, :-1], 0, 1) | |
| labels4 = labels4[labels4[:, 2] > 0] | |
| labels4 = labels4[labels4[:, 3] > 0] | |
| return img4, labels4 | |
| def load_single_img(self, index): | |
| label_path = os.path.join(self.label_dir, self.annotations.iloc[index, 1]) | |
| bboxes = np.roll( | |
| np.loadtxt(fname=label_path, delimiter=" ", ndmin=2), 4, axis=1 | |
| ).tolist() | |
| img_path = os.path.join(self.img_dir, self.annotations.iloc[index, 0]) | |
| image = np.array(Image.open(img_path).convert("RGB")) | |
| return image, bboxes | |
| def __getitem__(self, index): | |
| if random.random() < self.mosaic_percentage: | |
| image, bboxes = self.load_mosaic(index) | |
| else: | |
| image, bboxes = self.load_single_img(index) | |
| if self.transform: | |
| augmentations = self.transform(image=image, bboxes=bboxes) | |
| image = augmentations["image"] | |
| bboxes = augmentations["bboxes"] | |
| # e.g. = (3, 13, 13, 6), (3, 26, 26, 6), (3, 52, 52, 6) || 6 = [x, y, w, h, obj, class] for each anchor box | |
| targets = [torch.zeros((self.num_anchors // 3, S, S, 6)) for S in self.S] | |
| for box in bboxes: | |
| iou_anchors = iou(torch.tensor(box[2:4]), self.anchors) | |
| anchor_indices = iou_anchors.argsort(descending=True, dim=0) | |
| x, y, width, height, class_label = box | |
| has_anchor = [False] * 3 # each scale should have one anchor | |
| for anchor_idx in anchor_indices: | |
| scale_idx = anchor_idx // self.num_anchors_per_scale | |
| anchor_on_scale = anchor_idx % self.num_anchors_per_scale | |
| S = self.S[scale_idx] | |
| i, j = int(S * y), int(S * x) # which cell | |
| anchor_taken = targets[scale_idx][anchor_on_scale, i, j, 0] | |
| if not anchor_taken and not has_anchor[scale_idx]: | |
| targets[scale_idx][anchor_on_scale, i, j, 0] = 1 | |
| x_cell, y_cell = S * x - j, S * y - i # both between [0,1] | |
| width_cell, height_cell = ( | |
| width * S, | |
| height * S, | |
| ) # can be greater than 1 since it's relative to cell | |
| box_coordinates = torch.tensor( | |
| [x_cell, y_cell, width_cell, height_cell] | |
| ) | |
| targets[scale_idx][anchor_on_scale, i, j, 1:5] = box_coordinates | |
| targets[scale_idx][anchor_on_scale, i, j, 5] = int(class_label) | |
| has_anchor[scale_idx] = True | |
| elif ( | |
| not anchor_taken | |
| and iou_anchors[anchor_idx] > self.ignore_iou_thresh | |
| ): | |
| targets[scale_idx][ | |
| anchor_on_scale, i, j, 0 | |
| ] = -1 # ignore prediction | |
| return image, tuple(targets) | |
| class YOLODataModule(pl.LightningDataModule): | |
| def __init__(self, train_csv_path, test_csv_path): | |
| super().__init__() | |
| self.train_csv_path = train_csv_path | |
| self.test_csv_path = test_csv_path | |
| self.train_dataset = None | |
| self.eval_dataset = None | |
| self.test_dataset = None | |
| def setup(self, stage=None): | |
| self.train_dataset = YOLODataset( | |
| self.train_csv_path, | |
| transform=transforms.train_transforms, | |
| S=[ | |
| config.IMAGE_SIZE // 32, | |
| config.IMAGE_SIZE // 16, | |
| config.IMAGE_SIZE // 8 | |
| ], | |
| img_dir=config.IMG_DIR, | |
| label_dir=config.LABEL_DIR, | |
| anchors=config.ANCHORS, | |
| mosaic_percentage=config.TRAIN_MOSAIC_PERCENTAGE | |
| ) | |
| self.eval_dataset = YOLODataset( | |
| self.train_csv_path, | |
| transform=transforms.test_transforms, | |
| S=[ | |
| config.IMAGE_SIZE // 32, | |
| config.IMAGE_SIZE // 16, | |
| config.IMAGE_SIZE // 8 | |
| ], | |
| img_dir=config.IMG_DIR, | |
| label_dir=config.LABEL_DIR, | |
| anchors=config.ANCHORS, | |
| mosaic_percentage=config.TRAIN_MOSAIC_PERCENTAGE # should be 0? | |
| ) | |
| self.test_dataset = YOLODataset( | |
| self.test_csv_path, | |
| transform=transforms.test_transforms, | |
| S=[ | |
| config.IMAGE_SIZE // 32, | |
| config.IMAGE_SIZE // 16, | |
| config.IMAGE_SIZE // 8 | |
| ], | |
| img_dir=config.IMG_DIR, | |
| label_dir=config.LABEL_DIR, | |
| anchors=config.ANCHORS, | |
| mosaic_percentage=config.TEST_MOSAIC_PERCENTAGE | |
| ) | |
| def train_dataloader(self): | |
| return DataLoader( | |
| dataset=self.train_dataset, | |
| batch_size=config.BATCH_SIZE, | |
| shuffle=True, | |
| num_workers=config.NUM_WORKERS, | |
| pin_memory=config.PIN_MEMORY, | |
| drop_last=False | |
| ) | |
| def val_dataloader(self): | |
| return DataLoader( | |
| dataset=self.eval_dataset, | |
| batch_size=config.BATCH_SIZE, | |
| shuffle=False, | |
| num_workers=config.NUM_WORKERS, | |
| pin_memory=config.PIN_MEMORY, | |
| drop_last=False | |
| ) | |
| def test_dataloader(self): | |
| return DataLoader( | |
| dataset=self.test_dataset, | |
| batch_size=config.BATCH_SIZE, | |
| shuffle=False, | |
| num_workers=config.NUM_WORKERS, | |
| pin_memory=config.PIN_MEMORY, | |
| drop_last=False | |
| ) | |
| def test(): | |
| anchors = config.ANCHORS | |
| transform = config.test_transforms | |
| dataset = YOLODataset( | |
| "../data/PASCAL_VOC/2examples.csv", | |
| "../data/PASCAL_VOC/images", | |
| "../data/PASCAL_VOC/labels", | |
| S=[13, 26, 52], | |
| anchors=anchors, | |
| transform=transform | |
| ) | |
| S = [13, 26, 52] | |
| scaled_anchors = torch.tensor(anchors) / ( | |
| 1 / torch.tensor(S).unsqueeze(1).unsqueeze(1).repeat(1, 3, 2) | |
| ) | |
| loader = DataLoader(dataset=dataset, batch_size=1, shuffle=True) | |
| for x, y in loader: | |
| boxes = [] | |
| for i in range(y[0].shape[1]): | |
| anchor = scaled_anchors[i] | |
| print(anchor.shape) | |
| print(y[i].shape) | |
| boxes += cells_to_bboxes( | |
| y[i], is_preds=False, S=y[i].shape[2], anchors=anchor | |
| )[0] | |
| boxes = nms(boxes, iou_threshold=1, threshold=0.7, box_format="midpoint") | |
| print(boxes) | |
| plot_image(x[0].permute(1, 2, 0).to("cpu"), boxes) | |
| if __name__ == "__main__": | |
| test() | |