| | from pathlib import Path |
| | import math |
| |
|
| | from rich.console import Console |
| | from rich.table import Table |
| | from rich.pretty import Pretty |
| |
|
| | import numpy as np |
| |
|
| | import pandas as pd |
| |
|
| | import cv2 |
| |
|
| | from sklearn.cluster import MeanShift |
| |
|
| | from skimage.transform import hough_circle, hough_circle_peaks |
| |
|
| |
|
| | import torch |
| | from torch.utils.data import Dataset, DataLoader |
| | from torchvision import transforms |
| | from torchvision.models.detection.faster_rcnn import FastRCNNPredictor |
| |
|
| | from torchvision.models.detection import ( |
| | fasterrcnn_resnet50_fpn_v2, |
| | FasterRCNN_ResNet50_FPN_V2_Weights, |
| | ) |
| |
|
| | import pytorch_lightning as pl |
| | from pytorch_lightning.callbacks import RichProgressBar |
| | from pytorch_lightning import Trainer |
| |
|
| | import albumentations as A |
| | from albumentations.pytorch.transforms import ToTensorV2 |
| |
|
| | import matplotlib.pyplot as plt |
| |
|
| | import com_const as cc |
| | import com_image as ci |
| |
|
| | g_device = ( |
| | "mps" |
| | if torch.backends.mps.is_built() is True |
| | else "cuda" if torch.backends.cuda.is_built() else "cpu" |
| | ) |
| |
|
| |
|
| | def load_tray_image(image_name): |
| | return ci.load_image( |
| | file_name=image_name, path_to_images=cc.path_to_plates, rgb=True |
| | ) |
| |
|
| |
|
| | def build_albumentations( |
| | image_size: int = 10, |
| | gamma=(60, 180), |
| | mean=(0.485, 0.456, 0.406), |
| | std=(0.229, 0.224, 0.225), |
| | ): |
| | return { |
| | "resize": [ |
| | A.Resize(height=image_size * 32 * 2, width=image_size * 32 * 3, p=1) |
| | ], |
| | "train": [ |
| | A.HorizontalFlip(p=0.3), |
| | A.RandomBrightnessContrast( |
| | brightness_limit=0.25, contrast_limit=0.25, p=0.5 |
| | ), |
| | A.RandomGamma(gamma_limit=gamma, p=0.5), |
| | ], |
| | "to_tensor": [A.Normalize(mean=mean, std=std, p=1), ToTensorV2()], |
| | "un_normalize": [ |
| | A.Normalize( |
| | mean=[-m / s for m, s in zip(mean, std)], |
| | std=[1.0 / s for s in std], |
| | always_apply=True, |
| | max_pixel_value=1.0, |
| | ), |
| | ], |
| | } |
| |
|
| |
|
| | def get_augmentations( |
| | image_size: int = 10, |
| | gamma=(60, 180), |
| | kinds: list = ["resize", "to_tensor"], |
| | mean=(0.485, 0.456, 0.406), |
| | std=(0.229, 0.224, 0.225), |
| | inferrence: bool = False, |
| | ): |
| | td_ = build_albumentations( |
| | image_size=image_size, |
| | gamma=gamma, |
| | mean=mean, |
| | std=std, |
| | ) |
| | augs = [] |
| | for k in kinds: |
| | augs += td_[k] |
| | if inferrence is True: |
| | return A.Compose(augs) |
| | else: |
| | return A.Compose( |
| | augs, |
| | bbox_params={"format": "pascal_voc", "label_fields": ["labels"]}, |
| | ) |
| |
|
| |
|
| | def safe_row_col(row, col): |
| | """Ensures that row is a string and col is an integer |
| | Args: |
| | row (int or str): row output must be string |
| | col (int or str): col output must be int |
| | """ |
| | if row is not None and col is not None: |
| | if isinstance(col, str): |
| | row, col = col, row |
| | return row, col |
| |
|
| |
|
| | def _update_axis(axis, image, title=None, fontsize=10, remove_axis=True): |
| | axis.imshow(image, origin="upper") |
| | if title is not None: |
| | axis.set_title(title, fontsize=fontsize) |
| |
|
| |
|
| | def make_patches_grid(images, row_count, col_count=None, figsize=(20, 20)): |
| | col_count = row_count if col_count is None else col_count |
| | _, axii = plt.subplots(row_count, col_count, figsize=figsize) |
| | for ax, image in zip(axii.reshape(-1), images): |
| | if isinstance(image, tuple): |
| | title = image[1] |
| | image = image[0] |
| | else: |
| | title = None |
| | try: |
| | _update_axis(axis=ax, image=image, remove_axis=True, title=title) |
| | except: |
| | pass |
| | ax.set_axis_off() |
| |
|
| | plt.tight_layout() |
| | plt.show() |
| |
|
| |
|
| | def print_boxes( |
| | image_name, |
| | boxes, |
| | highlight=(None, None), |
| | draw_first_line: bool = False, |
| | return_plot: bool = True, |
| | ): |
| | r, c = safe_row_col(*highlight) |
| | image = load_tray_image(image_name=image_name) |
| |
|
| | fnt = cv2.FONT_HERSHEY_SIMPLEX |
| | fnt_scale = 3 |
| | fnt_thickness = 8 |
| |
|
| | column_colors = { |
| | 1: (255, 0, 0), |
| | 2: (0, 0, 255), |
| | 3: (255, 255, 0), |
| | 4: (0, 255, 255), |
| | } |
| |
|
| | for box in boxes[["x1", "y1", "x2", "y2", "cx", "cy", "row", "col"]].values: |
| | color = ( |
| | (255, 0, 255) |
| | if c == box[7] and r == box[6] |
| | else column_colors.get(box[7], (255, 255, 244)) |
| | ) |
| | thickness = 20 if c == box[7] and r == box[6] else 10 |
| | image = cv2.rectangle( |
| | image, |
| | (int(box[0]), int(box[1])), |
| | (int(box[2]), int(box[3])), |
| | color, |
| | thickness, |
| | ) |
| | label = str(box[6]).upper() + str(int(box[7])) |
| | (w, h), _ = cv2.getTextSize(label, fnt, fnt_scale, fnt_thickness) |
| | x, y = (int(box[0]), int(box[1]) - fnt_thickness) |
| | image = cv2.rectangle( |
| | image, |
| | (x - fnt_thickness, y - h - fnt_thickness), |
| | (x + fnt_thickness + w, y + fnt_thickness), |
| | color, |
| | -1, |
| | ) |
| | image = cv2.putText( |
| | image, |
| | label, |
| | (x + fnt_thickness, y), |
| | fnt, |
| | fnt_scale, |
| | (0, 0, 0), |
| | fnt_thickness, |
| | ) |
| |
|
| | if draw_first_line is True: |
| | line = get_first_vert_line(image_name=image_name) |
| | if line is not None: |
| | x1, y1, x2, y2 = line |
| | cv2.line( |
| | image, |
| | [ |
| | int(i) |
| | for i in (np.array([x2, y2]) - np.array([x1, y1])) * 10 |
| | + np.array([x1, y1]) |
| | ], |
| | [ |
| | int(i) |
| | for i in (np.array([x1, y1]) - np.array([x2, y2])) * 10 |
| | + np.array([x2, y2]) |
| | ], |
| | (255, 0, 255), |
| | 20, |
| | lineType=8, |
| | ) |
| |
|
| | if return_plot is True: |
| | plt.figure(figsize=(10, 10)) |
| | plt.imshow(image) |
| | plt.tight_layout() |
| | plt.axis("off") |
| | plt.show() |
| | else: |
| | return image |
| |
|
| |
|
| | def crop_to_vert(image): |
| | return image[0 : image.shape[1] // 2, 0 : image.shape[0] // 3] |
| |
|
| |
|
| | def get_first_vert_line(image_name, min_angle=80, max_angle=100): |
| | r, *_ = cv2.split(load_tray_image(image_name)) |
| |
|
| | red_crop = cv2.normalize( |
| | crop_to_vert(r), |
| | None, |
| | alpha=0, |
| | beta=200, |
| | norm_type=cv2.NORM_MINMAX, |
| | ) |
| |
|
| | lines = cv2.HoughLinesP( |
| | image=ci.close( |
| | cv2.Canny(red_crop, 50, 200, None, 3), |
| | kernel_size=5, |
| | proc_times=5, |
| | ), |
| | rho=1, |
| | theta=np.pi / 180, |
| | threshold=50, |
| | minLineLength=red_crop.shape[0] // 5, |
| | maxLineGap=20, |
| | ) |
| | if lines is not None: |
| | min_x = red_crop.shape[0] |
| | sel_line = None |
| | for _, line in enumerate(lines): |
| | x1, y1, x2, y2 = line[0] |
| | min_angle, max_angle = min(min_angle, max_angle), max(min_angle, max_angle) |
| | line_angle = math.atan2(y2 - y1, x2 - x1) * 180 / math.pi * -1 |
| | if min_angle <= abs(line_angle) <= max_angle and min(x1, x2) < min_x: |
| | min_x = min(x1, x2) |
| | sel_line = (x1, y1, x2, y2) |
| |
|
| | if sel_line is not None: |
| | return sel_line |
| | else: |
| | return None |
| |
|
| |
|
| | def draw_first_line(image_name, dot_size=10, crop_canvas: bool = False): |
| | line = get_first_vert_line(image_name=image_name) |
| | if line is None: |
| | return canvas |
| | x1, y1, x2, y2 = line |
| | canvas = load_tray_image(image_name) |
| | if crop_canvas is True: |
| | canvas = crop_to_vert(canvas) |
| | cv2.circle(canvas, (x1, y1), dot_size, (255, 0, 0)) |
| | cv2.circle(canvas, (x2, y2), dot_size, (0, 255, 0)) |
| | cv2.line(canvas, (x1, y1), (x2, y2), (0, 0, 255), 10) |
| | return canvas |
| |
|
| |
|
| | def get_bbox(image_name, bboxes, row, col): |
| | if isinstance(bboxes, pd.Series): |
| | return bboxes |
| | else: |
| | row, col = safe_row_col(row, col) |
| | return bboxes[ |
| | ( |
| | bboxes.file_name |
| | == (image_name.name if isinstance(image_name, Path) else image_name) |
| | ) |
| | & (bboxes.row == row) |
| | & (bboxes.col == col) |
| | ].iloc[0] |
| |
|
| |
|
| | def get_hough_leaf_disc_circle( |
| | image_name, |
| | bboxes, |
| | row=-1, |
| | col=-1, |
| | padding: int = 10, |
| | allow_move: bool = False, |
| | ): |
| | padded_leaf_disk = get_leaf_disk_wbb( |
| | image_name=image_name, |
| | bboxes=bboxes, |
| | row=row, |
| | col=col, |
| | padding=padding, |
| | ) |
| | *_, b = cv2.split(padded_leaf_disk) |
| |
|
| | min_t, max_t = 100, 200 |
| | rb = cv2.Canny( |
| | cv2.normalize( |
| | b, |
| | None, |
| | alpha=0, |
| | beta=200, |
| | norm_type=cv2.NORM_MINMAX, |
| | ), |
| | min_t, |
| | max_t, |
| | None, |
| | 3, |
| | ) |
| |
|
| | bbox = get_bbox(image_name=image_name, bboxes=bboxes, row=row, col=col) |
| | hough_radii = np.arange(bbox.max_size // 2 - 10, bbox.max_size // 2 + 10, 10) |
| | hough_res = hough_circle(rb, hough_radii) |
| |
|
| | |
| | _, cx, cy, radii = hough_circle_peaks( |
| | hough_res, |
| | hough_radii, |
| | min_xdistance=10, |
| | min_ydistance=10, |
| | total_num_peaks=1, |
| | ) |
| |
|
| | cx = cx[0] |
| | cy = cy[0] |
| | r = radii[0] |
| |
|
| | if allow_move is True: |
| | h, w, c = padded_leaf_disk.shape |
| | if cx - r < 0: |
| | cx += abs(r - cx) |
| | if cx + r > w: |
| | cx -= abs(r - cx) |
| | if cy - r < 0: |
| | cy += abs(cy - r) |
| | if cy + r > h: |
| | cy -= abs(cy - r) |
| |
|
| | return dict(cx=cx, cy=cy, r=radii) |
| |
|
| |
|
| | def get_hough_leaf_disk_patch( |
| | image_name, |
| | bboxes, |
| | patch_size=-1, |
| | row=-1, |
| | col=-1, |
| | padding: int = 10, |
| | radius_crop=0, |
| | disc=None, |
| | allow_move: bool = False, |
| | image_folder=None, |
| | ): |
| | if patch_size > 0: |
| | try: |
| | bbox = get_bbox(image_name, bboxes, row, col) |
| | cx = int(bbox.cx) |
| | cy = int(bbox.cy) |
| | except: |
| | return None |
| | patch_size = patch_size // 2 |
| |
|
| | return A.crop( |
| | load_tray_image(image_name, image_folder=image_folder), |
| | cx - patch_size, |
| | cy - patch_size, |
| | cx + patch_size, |
| | cy + patch_size, |
| | ) |
| | else: |
| | if disc is None: |
| | disc = get_hough_leaf_disc_circle( |
| | image_name=image_name, |
| | bboxes=bboxes, |
| | row=row, |
| | col=col, |
| | padding=padding, |
| | allow_move=allow_move, |
| | ) |
| |
|
| | r = int((disc["r"] - radius_crop) / math.sqrt(2)) |
| | cx = int(disc["cx"]) |
| | cy = int(disc["cy"]) |
| |
|
| | left = cx - r |
| | top = cy - r |
| | right = cx + r |
| | bottom = cy + r |
| |
|
| | return get_leaf_disk_wbb( |
| | image_name=image_name, |
| | bboxes=bboxes, |
| | row=row, |
| | col=col, |
| | padding=padding, |
| | )[top:bottom, left:right] |
| |
|
| |
|
| | def get_hough_segment_disk( |
| | image_name, |
| | bboxes, |
| | row=-1, |
| | col=-1, |
| | padding: int = 10, |
| | radius_crop=0, |
| | disc=None, |
| | allow_move: bool = False, |
| | ): |
| | if disc is None: |
| | disc = get_hough_leaf_disc_circle( |
| | image_name=image_name, |
| | bboxes=bboxes, |
| | row=row, |
| | col=col, |
| | padding=padding, |
| | allow_move=allow_move, |
| | ) |
| |
|
| | padded_leaf_disk = get_leaf_disk_wbb( |
| | image_name=image_name, |
| | bboxes=bboxes, |
| | row=row, |
| | col=col, |
| | padding=padding, |
| | ) |
| | r = int(disc["r"] - radius_crop) |
| | rc = int((disc["r"] - radius_crop) / math.sqrt(2)) |
| | cx = int(disc["cx"]) |
| | cy = int(disc["cy"]) |
| | left = cx - r |
| | top = cy - r |
| | right = cx + r |
| | bottom = cy + r |
| |
|
| | return cv2.bitwise_and( |
| | padded_leaf_disk, |
| | padded_leaf_disk, |
| | mask=cv2.circle(np.zeros_like(padded_leaf_disk[:, :, 0]), (cx, cy), r, 255, -1), |
| | )[top:bottom, left:right] |
| |
|
| |
|
| | def draw_hough_bb_to_patch_process( |
| | image_name, |
| | bboxes, |
| | row=-1, |
| | col=-1, |
| | padding: int = 10, |
| | radius_crop=0, |
| | disc=None, |
| | allow_move: bool = False, |
| | ): |
| | if disc is None: |
| | disc = get_hough_leaf_disc_circle( |
| | image_name=image_name, |
| | bboxes=bboxes, |
| | row=row, |
| | col=col, |
| | padding=padding, |
| | allow_move=allow_move, |
| | ) |
| |
|
| | padded_leaf_disk = get_leaf_disk_wbb( |
| | image_name=image_name, |
| | bboxes=bboxes, |
| | row=row, |
| | col=col, |
| | padding=padding, |
| | ) |
| | r = int(disc["r"] - radius_crop) |
| | rc = int((disc["r"] - radius_crop) / math.sqrt(2)) |
| | cx = int(disc["cx"]) |
| | cy = int(disc["cy"]) |
| | left = cx - r |
| | top = cy - r |
| | right = cx + r |
| | bottom = cy + r |
| |
|
| | return cv2.circle( |
| | cv2.circle( |
| | cv2.rectangle( |
| | cv2.rectangle( |
| | padded_leaf_disk, |
| | (cx - rc, cy - rc), |
| | (cx + rc, cy + rc), |
| | (0, 255, 0), |
| | 5, |
| | ), |
| | (left, top), |
| | (right, bottom), |
| | (255, 0, 155), |
| | 5, |
| | ), |
| | (cx, cy), |
| | 10, |
| | (255, 0, 155), |
| | -1, |
| | ), |
| | (cx, cy), |
| | r, |
| | (255, 0, 155), |
| | 5, |
| | ) |
| |
|
| |
|
| | def get_leaf_disk_wbb(image_name, bboxes, row=-1, col=-1, image_path: Path = None): |
| | try: |
| | bbox = get_bbox(image_name, bboxes, row, col) |
| | return load_tray_image(image_name if image_path is None else image_path)[ |
| | int(bbox.y1) : int(bbox.y2), int(bbox.x1) : int(bbox.x2) |
| | ] |
| | except: |
| | return None |
| |
|
| |
|
| | def get_fast_leaf_disc_circle( |
| | image_name, bboxes, row=-1, col=-1, percent_radius: float = 1.0 |
| | ): |
| | bbox = get_bbox(image_name=image_name, bboxes=bboxes, row=row, col=col) |
| | return int(bbox.cx), int(bbox.cy), int((bbox.max_size / 2) * percent_radius) |
| |
|
| |
|
| | def get_fast_segment_disk( |
| | image_name, |
| | bboxes, |
| | row=-1, |
| | col=-1, |
| | percent_radius: float = 1.0, |
| | image_path: Path = None, |
| | ): |
| | cx, cy, r = get_fast_leaf_disc_circle( |
| | image_name=image_name, |
| | bboxes=bboxes, |
| | row=row, |
| | col=col, |
| | percent_radius=percent_radius, |
| | ) |
| | src_image = load_tray_image(image_name if image_path is None else image_path) |
| | left = cx - r |
| | top = cy - r |
| | right = cx + r |
| | bottom = cy + r |
| |
|
| | return cv2.bitwise_and( |
| | src_image, |
| | src_image, |
| | mask=cv2.circle(np.zeros_like(src_image[:, :, 0]), (cx, cy), r, 255, -1), |
| | )[top:bottom, left:right] |
| |
|
| |
|
| | def get_fast_leaf_disk_patch( |
| | image_name, |
| | bboxes, |
| | row=-1, |
| | col=-1, |
| | percent_radius: float = 1.0, |
| | image_path: Path = None, |
| | ): |
| | cx, cy, r = get_fast_leaf_disc_circle( |
| | image_name=image_name, |
| | bboxes=bboxes, |
| | row=row, |
| | col=col, |
| | percent_radius=percent_radius, |
| | ) |
| | r = int(r / math.sqrt(2)) |
| | left = cx - r |
| | top = cy - r |
| | right = cx + r |
| | bottom = cy + r |
| |
|
| | return load_tray_image(image_name if image_path is None else image_path)[ |
| | top:bottom, left:right |
| | ] |
| |
|
| |
|
| | def draw_fast_bb_to_patch_process( |
| | image_name, |
| | bboxes, |
| | row=-1, |
| | col=-1, |
| | percent_radius: float = 1.0, |
| | image_path: Path = None, |
| | add_center: bool = True, |
| | ): |
| | cx, cy, r = get_fast_leaf_disc_circle( |
| | image_name=image_name, |
| | bboxes=bboxes, |
| | row=row, |
| | col=col, |
| | percent_radius=percent_radius, |
| | ) |
| | bbox = get_bbox(image_name=image_name, bboxes=bboxes, row=row, col=col) |
| | image = load_tray_image(image_name if image_path is None else image_path) |
| | rc = int(r / math.sqrt(2)) |
| |
|
| | cv2.circle(image, (cx, cy), r, color=(255, 0, 155), thickness=5) |
| | if add_center is True: |
| | cv2.circle(image, (cx, cy), 10, color=(255, 0, 155), thickness=-1) |
| | cv2.rectangle(image, (cx - rc, cy - rc), (cx + rc, cy + rc), (0, 255, 0), 5) |
| |
|
| | return image[int(bbox.y1) : int(bbox.y2), int(bbox.x1) : int(bbox.x2)] |
| |
|
| |
|
| | class LeafDiskDetectorDataset(Dataset): |
| | def __init__( |
| | self, |
| | csv, |
| | transform=None, |
| | yxyx: bool = False, |
| | return_id: bool = False, |
| | bboxes: bool = True, |
| | ): |
| | self.boxes = csv.copy() |
| | self.images = list(self.boxes.plate_name.unique()) |
| | self.transforms = transform |
| | if transform is not None: |
| | self.width, self.height = transform[0].width, transform[0].height |
| | else: |
| | self.width, self.height = 0, 0 |
| | self.yxyx = yxyx |
| | self.return_id = return_id |
| | self.bboxes = bboxes |
| |
|
| | def __len__(self): |
| | return len(self.images) |
| |
|
| | def load_boxes(self, idx): |
| | if "x" in self.boxes.columns: |
| | boxes = self.boxes[self.boxes.plate_name == self.images[idx]].dropna() |
| | size = boxes.shape[0] |
| | return ( |
| | (size, boxes[["x1", "y1", "x2", "y2"]].values) if size > 0 else (0, []) |
| | ) |
| | return 0, [] |
| |
|
| | def load_tray_image(self, idx): |
| | return load_tray_image(self.images[idx]) |
| |
|
| | def get_by_sample_name(self, plate_name): |
| | return self[self.images.index(plate_name)] |
| |
|
| | def get_image_by_name(self, plate_name): |
| | return load_tray_image(plate_name) |
| |
|
| | def draw_image_with_boxes(self, plate_name): |
| | image, labels, *_ = self[self.images.index(plate_name)] |
| | boxes = labels[self.get_boxes_key()] |
| | for box in boxes: |
| | box_indexes = [1, 0, 3, 2] if self.yxyx is True else [0, 1, 2, 3] |
| | image = cv2.rectangle( |
| | image, |
| | |
| | (int(box[box_indexes[0]]), int(box[box_indexes[1]])), |
| | (int(box[box_indexes[2]]), int(box[box_indexes[3]])), |
| | (255, 0, 0), |
| | 2, |
| | ) |
| | return image |
| |
|
| | def get_boxes_key(self): |
| | return "bboxes" if self.bboxes is True else "boxes" |
| |
|
| | def __getitem__(self, index): |
| | num_box, boxes = self.load_boxes( |
| | index |
| | ) |
| | img = self.load_tray_image(index) |
| |
|
| | if num_box > 0: |
| | boxes = torch.as_tensor(boxes, dtype=torch.float32) |
| | else: |
| | |
| | boxes = torch.zeros((0, 4), dtype=torch.float32) |
| |
|
| | image_id = torch.tensor([index]) |
| | labels = torch.ones((num_box,), dtype=torch.int64) |
| | target = { |
| | self.get_boxes_key(): boxes, |
| | "labels": labels, |
| | "image_id": image_id, |
| | "area": torch.as_tensor( |
| | (boxes[:, 3] - boxes[:, 1]) * (boxes[:, 2] - boxes[:, 0]), |
| | dtype=torch.float32, |
| | ), |
| | "iscrowd": torch.zeros((num_box,), dtype=torch.int64), |
| | "img_size": torch.tensor([self.height, self.width]), |
| | "img_scale": torch.tensor([1.0]), |
| | } |
| |
|
| | if self.transforms is not None: |
| | sample = { |
| | "image": img, |
| | "bboxes": target[self.get_boxes_key()], |
| | "labels": labels, |
| | } |
| | sample = self.transforms(**sample) |
| | img = sample["image"] |
| | if num_box > 0: |
| | |
| | boxes = np.array(sample["bboxes"]) |
| | |
| | if self.yxyx is True: |
| | boxes[:, [0, 1, 2, 3]] = boxes[:, [1, 0, 3, 2]] |
| | |
| | target[self.get_boxes_key()] = torch.as_tensor( |
| | boxes, dtype=torch.float32 |
| | ) |
| | else: |
| | target[self.get_boxes_key()] = torch.zeros((0, 4), dtype=torch.float32) |
| | else: |
| | img = transforms.ToTensor()(img) |
| | if self.return_id is True: |
| | return img, target, image_id |
| | else: |
| | return img, target |
| |
|
| |
|
| | def collate_fn(batch): |
| | images, targets = tuple(zip(*batch)) |
| | images = torch.stack(images) |
| | images = images.float() |
| |
|
| | boxes = [target["boxes"].float() for target in targets] |
| | labels = [target["labels"].float() for target in targets] |
| |
|
| | return images, targets |
| |
|
| |
|
| | def find_best_lr(model, default_root_dir=cc.path_to_chk_detector): |
| | |
| | trainer = Trainer( |
| | default_root_dir=default_root_dir, |
| | auto_lr_find=True, |
| | accelerator="gpu", |
| | callbacks=[RichProgressBar()], |
| | ) |
| |
|
| | |
| | trainer.tune(model) |
| |
|
| | return model.learning_rate |
| |
|
| |
|
| | class LeafDiskDetector(pl.LightningModule): |
| | def __init__( |
| | self, |
| | batch_size: int, |
| | learning_rate: float, |
| | max_epochs: int, |
| | image_factor: int, |
| | train_data: pd.DataFrame, |
| | val_data: pd.DataFrame, |
| | test_data: pd.DataFrame, |
| | augmentations_kinds: list = ["resize", "train", "to_tensor"], |
| | augmentations_params: dict = {"gamma": (60, 180)}, |
| | num_workers: int = 0, |
| | accumulate_grad_batches: int = 3, |
| | selected_device: str = g_device, |
| | optimizer: str = "adam", |
| | scheduler: str = None, |
| | scheduler_params: dict = {}, |
| | ): |
| | super().__init__() |
| |
|
| | self.model_name = "ldd" |
| |
|
| | |
| | self.batch_size = batch_size |
| | self.selected_device = selected_device |
| | self.learning_rate = learning_rate |
| | self.num_workers = num_workers |
| | self.max_epochs = max_epochs |
| | self.accumulate_grad_batches = accumulate_grad_batches |
| |
|
| | |
| | self.train_data = train_data |
| | self.val_data = val_data |
| | self.test_data = test_data |
| |
|
| | |
| | self.optimizer = optimizer |
| | self.scheduler = scheduler |
| | self.scheduler_params = scheduler_params |
| |
|
| | |
| | self.image_factor = image_factor |
| | self.augmentations_kinds = augmentations_kinds |
| | self.augmentations_params = augmentations_params |
| |
|
| | self.train_augmentations = get_augmentations( |
| | image_size=self.image_factor, |
| | kinds=self.augmentations_kinds, |
| | **self.augmentations_params, |
| | ) |
| |
|
| | self.val_augmentations = get_augmentations( |
| | image_size=self.image_factor, |
| | kinds=["resize", "to_tensor"], |
| | **self.augmentations_params, |
| | ) |
| |
|
| | |
| | self.encoder = fasterrcnn_resnet50_fpn_v2( |
| | weights=FasterRCNN_ResNet50_FPN_V2_Weights |
| | ) |
| | num_classes = 2 |
| | |
| | in_features = self.encoder.roi_heads.box_predictor.cls_score.in_features |
| | |
| | self.encoder.roi_heads.box_predictor = FastRCNNPredictor( |
| | in_features, num_classes |
| | ) |
| |
|
| | self.save_hyperparameters() |
| |
|
| | def hr_desc(self): |
| | table = Table(title=f"{self.model_name} params & values") |
| | table.add_column("Param", justify="right", style="bold", no_wrap=True) |
| | table.add_column("Value") |
| |
|
| | def add_pairs(table_, attributes: list) -> None: |
| | for a in attributes: |
| | try: |
| | table_.add_row(a, Pretty(getattr(self, a))) |
| | except: |
| | pass |
| |
|
| | add_pairs( |
| | table, |
| | ["model_name", "batch_size", "num_workers", "accumulate_grad_batches"], |
| | ) |
| | table.add_row("image_width", Pretty(self.train_augmentations[0].width)) |
| | table.add_row("image_height", Pretty(self.train_augmentations[0].height)) |
| | add_pairs( |
| | table, |
| | ["image_factor", "augmentations_kinds", "augmentations_params"], |
| | ) |
| |
|
| | add_pairs( |
| | table, |
| | ["learning_rate", "optimizer", "scheduler", "scheduler_params"], |
| | ) |
| |
|
| | for name, df in zip( |
| | ["train", "val", "test"], |
| | [self.train_data, self.val_data, self.test_data], |
| | ): |
| | table.add_row( |
| | name, |
| | Pretty( |
| | f"shape: {str(df.shape)}, images: {len(df.plate_name.unique())}" |
| | ), |
| | ) |
| |
|
| | Console().print(table) |
| |
|
| | def configure_optimizers(self): |
| | |
| | if self.optimizer == "adam": |
| | optimizer = torch.optim.Adam(self.parameters(), lr=self.learning_rate) |
| | elif self.optimizer == "sgd": |
| | optimizer = torch.optim.SGD(self.parameters(), lr=self.learning_rate) |
| | else: |
| | optimizer = None |
| |
|
| | |
| | if self.scheduler == "cycliclr": |
| | scheduler = torch.optim.lr_scheduler.CyclicLR( |
| | optimizer, |
| | base_lr=self.learning_rate, |
| | max_lr=0.01, |
| | step_size_up=100, |
| | mode=self.scheduler_mode, |
| | ) |
| | elif self.scheduler == "steplr": |
| | self.scheduler_params["optimizer"] = optimizer |
| | scheduler = torch.optim.lr_scheduler.StepLR(**self.scheduler_params) |
| | self.scheduler_params.pop("optimizer") |
| | elif self.scheduler == "plateau": |
| | scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau( |
| | optimizer, |
| | mode="min", |
| | factor=0.2, |
| | patience=10, |
| | min_lr=1e-6, |
| | ) |
| | scheduler = {"scheduler": scheduler, "monitor": "val_loss"} |
| | else: |
| | scheduler = None |
| | if scheduler is None: |
| | return optimizer |
| | else: |
| | return [optimizer], [scheduler] |
| |
|
| | def train_dataloader(self): |
| | return DataLoader( |
| | LeafDiskDetectorDataset( |
| | csv=self.train_data, |
| | transform=self.train_augmentations, |
| | bboxes=False, |
| | ), |
| | batch_size=self.batch_size, |
| | shuffle=True, |
| | num_workers=self.num_workers, |
| | collate_fn=collate_fn, |
| | pin_memory=True, |
| | ) |
| |
|
| | def val_dataloader(self): |
| | return DataLoader( |
| | LeafDiskDetectorDataset( |
| | csv=self.train_data, |
| | transform=self.val_augmentations, |
| | bboxes=False, |
| | ), |
| | batch_size=self.batch_size, |
| | num_workers=self.num_workers, |
| | collate_fn=collate_fn, |
| | pin_memory=True, |
| | ) |
| |
|
| | def test_dataloader(self): |
| | return DataLoader( |
| | LeafDiskDetectorDataset( |
| | csv=self.train_data, |
| | transform=self.val_augmentations, |
| | bboxes=False, |
| | ), |
| | batch_size=self.batch_size, |
| | num_workers=self.num_workers, |
| | collate_fn=collate_fn, |
| | pin_memory=True, |
| | ) |
| |
|
| | def forward(self, x): |
| | return self.encoder(x) |
| |
|
| | def step_(self, batch, batch_index): |
| | x, y = batch |
| | self.train() |
| | loss_dict = self.encoder(x, y) |
| | return sum(loss for loss in loss_dict.values()) |
| |
|
| | def training_step(self, batch, batch_idx): |
| | loss = self.step_(batch=batch, batch_index=batch_idx) |
| | self.log( |
| | "train_loss", loss, on_step=True, prog_bar=True, batch_size=self.batch_size |
| | ) |
| | return loss |
| |
|
| | def validation_step(self, batch, batch_idx): |
| | loss = self.step_(batch=batch, batch_index=batch_idx) |
| | self.log( |
| | "val_loss", |
| | loss, |
| | on_epoch=True, |
| | on_step=False, |
| | prog_bar=True, |
| | batch_size=self.batch_size, |
| | ) |
| | self.log("train_loss", loss) |
| | return loss |
| |
|
| | def test_step(self, batch, batch_idx): |
| | loss = self.step_( |
| | batch=batch, batch_index=batch_idx, batch_size=self.batch_size |
| | ) |
| | self.log("test_loss", loss) |
| | return loss |
| |
|
| | def prepare_bboxes( |
| | self, |
| | image_name, |
| | score_threshold=0.90, |
| | ar_threshold=1.5, |
| | size_threshold=0.30, |
| | ): |
| | augs = get_augmentations( |
| | image_size=self.image_factor, |
| | kinds=["resize", "to_tensor"], |
| | inferrence=True, |
| | **self.augmentations_params, |
| | ) |
| | image = load_tray_image(image_name=image_name) |
| |
|
| | self.to(g_device) |
| | self.eval() |
| | predictions = self(augs(image=image)["image"].to(g_device).unsqueeze(0)) |
| |
|
| | boxes = predictions[0]["boxes"].detach().to("cpu").numpy() |
| | scores = predictions[0]["scores"].detach().to("cpu").numpy() |
| |
|
| | filtered_predictions = [ |
| | [box[i] for i in range(4)] |
| | for box, score in zip(boxes, scores) |
| | if score > score_threshold |
| | ] |
| |
|
| | restore_size = A.Compose( |
| | [A.Resize(width=image.shape[1], height=image.shape[0])], |
| | |
| | bbox_params={"format": "pascal_voc", "label_fields": ["labels"]}, |
| | ) |
| |
|
| | sample = { |
| | "image": image, |
| | "bboxes": filtered_predictions, |
| | "labels": [1 for _ in range(len(filtered_predictions))], |
| | } |
| | sample = restore_size(**sample) |
| |
|
| | resized_predictions = sample["bboxes"] |
| |
|
| | from siuba import _, filter, mutate |
| |
|
| | boxes = ( |
| | pd.DataFrame(data=resized_predictions, columns=["x1", "y1", "x2", "y2"]) |
| | >> mutate( |
| | x1=_.x1 * image.shape[1] / augs[0].width, |
| | y1=_.y1 * image.shape[0] / augs[0].height, |
| | x2=_.x2 * image.shape[1] / augs[0].width, |
| | y2=_.y2 * image.shape[0] / augs[0].height, |
| | ) |
| | >> mutate(width=_.x2 - _.x1, height=_.y2 - _.y1) |
| | >> mutate(cx=(_.x1 + _.x2) / 2, cy=(_.y1 + _.y2) / 2) |
| | >> mutate(area=_.width * _.height) |
| | >> mutate(ar=_.width / _.height) |
| | ) |
| | boxes.insert( |
| | 0, |
| | "file_name", |
| | image_name.name if isinstance(image_name, Path) else image_name, |
| | ) |
| | boxes["max_size"] = boxes[["width", "height"]].max(axis=1) |
| |
|
| | ar_boxes = ( |
| | boxes |
| | >> filter(_.width / _.height < ar_threshold) |
| | >> filter(_.height / _.width < ar_threshold) |
| | ) |
| |
|
| | return ar_boxes[ar_boxes.area > ar_boxes.area.max() * size_threshold] |
| |
|
| | @staticmethod |
| | def init_cols(bboxes): |
| | bboxes = bboxes.copy() |
| |
|
| | |
| | X = np.reshape(bboxes.cx.to_list(), (-1, 1)) |
| | ms = MeanShift(bandwidth=100, bin_seeding=True) |
| | ms.fit(X) |
| | cols = ms.predict(X) |
| | bboxes["col"] = cols |
| |
|
| | bboxes = bboxes.sort_values("cx") |
| | bboxes["mean_cx"] = ( |
| | bboxes.groupby("col").transform("mean", numeric_only=True).cx |
| | ) |
| | bboxes = bboxes.sort_values("mean_cx") |
| | for i, val in enumerate(bboxes.mean_cx.unique()): |
| | bboxes.loc[bboxes["mean_cx"] == val, "col"] = i |
| |
|
| | |
| | bboxes = bboxes.sort_values("cy") |
| | X = np.reshape(bboxes.cy.to_list(), (-1, 1)) |
| | ms = MeanShift(bandwidth=100, bin_seeding=True) |
| | ms.fit(X) |
| | rows = ms.predict(X) |
| | bboxes["row"] = rows |
| |
|
| | bboxes = bboxes.sort_values("cy") |
| | bboxes["mean_cy"] = ( |
| | bboxes.groupby("row").transform("mean", numeric_only=True).cy |
| | ) |
| | bboxes = bboxes.sort_values("mean_cy") |
| | for i, val in zip(["a", "b", "c"], bboxes.mean_cy.unique()): |
| | bboxes.loc[bboxes["mean_cy"] == val, "row"] = i |
| |
|
| | bboxes = bboxes.sort_values("cx") |
| |
|
| | return bboxes |
| |
|
| | @staticmethod |
| | def finalize_indexing(bboxes, image_name): |
| | bboxes = bboxes.copy() |
| | bboxes = bboxes.sort_values("cx") |
| | labels_unique = bboxes.col.unique() |
| | labels = bboxes.col.to_numpy() |
| | if len(labels_unique) < 4: |
| | inc_labels = [[i, 0] for i in range(len(labels_unique))] |
| | max_width = bboxes.max_size.max() |
| |
|
| | |
| | |
| | left_most_line = get_first_vert_line(image_name=image_name) |
| | if left_most_line is not None: |
| | left_most_point = bboxes.x1.min() - min( |
| | left_most_line[0], left_most_line[1] |
| | ) |
| | else: |
| | left_most_point = bboxes.x1.min() - (max_width / 2) |
| | i = 1 |
| | while left_most_point > i * 1.1 * max_width: |
| | inc_labels[0][1] += 1 |
| | i += 1 |
| |
|
| | |
| | prev_min_min = bboxes[bboxes.col == 0].x2.max() |
| |
|
| | for label in labels_unique[1:]: |
| | current_label_contours = bboxes[bboxes.col == label] |
| | max_width = current_label_contours.max_size.max() |
| | min_left = current_label_contours.x1.min() |
| | i = 1 |
| | while min_left - prev_min_min > i * 1.1 * max_width: |
| | inc_labels[label][1] += 1 |
| | i += 1 |
| | prev_min_min = min_left + max_width |
| |
|
| | for pos, inc in reversed(inc_labels): |
| | labels[labels >= pos] += inc |
| |
|
| | bboxes["col"] = labels |
| |
|
| | labels_unique = np.unique(labels) |
| |
|
| | bboxes["col"] += 1 |
| |
|
| | return bboxes.sort_values(["row", "col"]) |
| |
|
| | def index_plate( |
| | self, |
| | image_name, |
| | score_threshold=0.90, |
| | ar_threshold=1.5, |
| | size_threshold=0.50, |
| | ): |
| | bboxes = self.prepare_bboxes( |
| | image_name=image_name, |
| | score_threshold=score_threshold, |
| | ar_threshold=ar_threshold, |
| | size_threshold=size_threshold, |
| | ) |
| | if bboxes.shape[0] == 0: |
| | return bboxes |
| |
|
| | bboxes = self.init_cols(bboxes=bboxes) |
| | bboxes = self.finalize_indexing(bboxes=bboxes, image_name=image_name) |
| |
|
| | return bboxes |
| |
|
| |
|
| | def test_augmentations( |
| | df, |
| | image_size, |
| | kinds: list = ["resize", "train"], |
| | row_count=2, |
| | col_count=4, |
| | **aug_params, |
| | ): |
| | src_dataset = LeafDiskDetectorDataset( |
| | csv=df, |
| | transform=get_augmentations( |
| | image_size=image_size, kinds=["resize"], **aug_params |
| | ), |
| | ) |
| |
|
| | test_dataset = LeafDiskDetectorDataset( |
| | csv=df, |
| | transform=get_augmentations(image_size=image_size, kinds=kinds, **aug_params), |
| | ) |
| |
|
| | image_name = df.sample(n=1).iloc[0].plate_name |
| |
|
| | images = [(src_dataset.draw_image_with_boxes(plate_name=image_name), "Source")] + [ |
| | (test_dataset.draw_image_with_boxes(plate_name=image_name), "Augmented") |
| | for i in range(row_count * col_count - 1) |
| | ] |
| |
|
| | make_patches_grid( |
| | images=images, |
| | row_count=row_count, |
| | col_count=col_count, |
| | figsize=(col_count * 4, row_count * 3), |
| | ) |
| |
|
| |
|
| | def get_file_path_from_row(row, path_to_patches: Path): |
| | return path_to_patches.joinpath(row.file_name) |
| |
|
| |
|
| | def get_fast_images( |
| | row, path_to_patches, percent_radius: float = 1.0, add_process_image: bool = False |
| | ): |
| | d = {} |
| | try: |
| | d["leaf_disc_box"] = get_leaf_disk_wbb( |
| | row.file_name, row, image_path=get_file_path_from_row(row, path_to_patches) |
| | ) |
| | except: |
| | pass |
| | try: |
| | d["segmented_leaf_disc"] = get_fast_segment_disk( |
| | image_name=row.file_name, |
| | bboxes=row, |
| | percent_radius=percent_radius, |
| | image_path=get_file_path_from_row(row, path_to_patches), |
| | ) |
| | except: |
| | pass |
| | try: |
| | d["leaf_disc_patch"] = get_fast_leaf_disk_patch( |
| | image_name=row.file_name, |
| | bboxes=row, |
| | percent_radius=percent_radius, |
| | image_path=get_file_path_from_row(row, path_to_patches), |
| | ) |
| | except: |
| | pass |
| | if add_process_image is True: |
| | try: |
| | d["process_image"] = draw_fast_bb_to_patch_process( |
| | image_name=row.file_name, |
| | bboxes=row, |
| | percent_radius=percent_radius, |
| | image_path=get_file_path_from_row(row, path_to_patches), |
| | ) |
| | except: |
| | pass |
| |
|
| | return d |
| |
|
| |
|
| | def save_images(row: pd.Series, images_data: dict, errors: dict, paths: dict): |
| | fn = f"{Path(row.file_name).stem}_{row.row}_{int(row.col)}.png" |
| | for k, image in images_data.items(): |
| | if k not in paths: |
| | continue |
| | path_to_image = paths[k].joinpath(fn) |
| | if image is not None: |
| | if path_to_image.is_file() is False: |
| | cv2.imwrite(str(path_to_image), cv2.cvtColor(image, cv2.COLOR_RGB2BGR)) |
| | elif errors is not None: |
| | errors[k].append(row.file_name) |
| | else: |
| | pass |
| |
|
| |
|
| | def handle_bbox( |
| | row: pd.Series, |
| | paths: dict, |
| | errors: dict = None, |
| | percent_radius: float = 1.0, |
| | add_process_image: bool = False, |
| | ): |
| | save_images( |
| | row=row, |
| | images_data=get_fast_images( |
| | row=row, |
| | percent_radius=percent_radius, |
| | add_process_image=add_process_image, |
| | path_to_patches=paths["plates"], |
| | ), |
| | errors=errors, |
| | paths=paths, |
| | ) |
| |
|