Spaces:
Build error
Build error
| import datetime | |
| import logging | |
| import os | |
| import platform | |
| import subprocess | |
| import time | |
| from pathlib import Path | |
| import re | |
| import glob | |
| import random | |
| import cv2 | |
| import numpy as np | |
| import torch | |
| import torchvision | |
| logger = logging.getLogger(__name__) | |
| def git_describe(path=Path(__file__).parent): # path must be a directory | |
| # return human-readable git description, i.e. v5.0-5-g3e25f1e https://git-scm.com/docs/git-describe | |
| s = f'git -C {path} describe --tags --long --always' | |
| try: | |
| return subprocess.check_output(s, shell=True, stderr=subprocess.STDOUT).decode()[:-1] | |
| except subprocess.CalledProcessError as e: | |
| return '' # not a git repository | |
| def date_modified(path=__file__): | |
| # return human-readable file modification date, i.e. '2021-3-26' | |
| t = datetime.datetime.fromtimestamp(Path(path).stat().st_mtime) | |
| return f'{t.year}-{t.month}-{t.day}' | |
| def select_device(device='', batch_size=None): | |
| # device = 'cpu' or '0' or '0,1,2,3' | |
| s = f'YOLOPv2 🚀 {git_describe() or date_modified()} torch {torch.__version__} ' # string | |
| cpu = device.lower() == 'cpu' | |
| if cpu: | |
| os.environ['CUDA_VISIBLE_DEVICES'] = '-1' # force torch.cuda.is_available() = False | |
| elif device: # non-cpu device requested | |
| os.environ['CUDA_VISIBLE_DEVICES'] = device # set environment variable | |
| assert torch.cuda.is_available(), f'CUDA unavailable, invalid device {device} requested' # check availability | |
| cuda = not cpu and torch.cuda.is_available() | |
| if cuda: | |
| n = torch.cuda.device_count() | |
| if n > 1 and batch_size: # check that batch_size is compatible with device_count | |
| assert batch_size % n == 0, f'batch-size {batch_size} not multiple of GPU count {n}' | |
| space = ' ' * len(s) | |
| for i, d in enumerate(device.split(',') if device else range(n)): | |
| p = torch.cuda.get_device_properties(i) | |
| s += f"{'' if i == 0 else space}CUDA:{d} ({p.name}, {p.total_memory / 1024 ** 2}MB)\n" # bytes to MB | |
| else: | |
| s += 'CPU\n' | |
| logger.info(s.encode().decode('ascii', 'ignore') if platform.system() == 'Windows' else s) # emoji-safe | |
| return torch.device('cuda:0' if cuda else 'cpu') | |
| def time_synchronized(): | |
| # pytorch-accurate time | |
| if torch.cuda.is_available(): | |
| torch.cuda.synchronize() | |
| return time.time() | |
| def plot_one_box(x, img, color=None, label=None, line_thickness=3): | |
| # Plots one bounding box on image img | |
| tl = line_thickness or round(0.002 * (img.shape[0] + img.shape[1]) / 2) + 1 # line/font thickness | |
| color = color or [random.randint(0, 255) for _ in range(3)] | |
| c1, c2 = (int(x[0]), int(x[1])), (int(x[2]), int(x[3])) | |
| cv2.rectangle(img, c1, c2, [0,255,255], thickness=2, lineType=cv2.LINE_AA) | |
| if label: | |
| tf = max(tl - 1, 1) # font thickness | |
| t_size = cv2.getTextSize(label, 0, fontScale=tl / 3, thickness=tf)[0] | |
| c2 = c1[0] + t_size[0], c1[1] - t_size[1] - 3 | |
| class SegmentationMetric(object): | |
| ''' | |
| imgLabel [batch_size, height(144), width(256)] | |
| confusionMatrix [[0(TN),1(FP)], | |
| [2(FN),3(TP)]] | |
| ''' | |
| def __init__(self, numClass): | |
| self.numClass = numClass | |
| self.confusionMatrix = np.zeros((self.numClass,)*2) | |
| def pixelAccuracy(self): | |
| # return all class overall pixel accuracy | |
| # acc = (TP + TN) / (TP + TN + FP + TN) | |
| acc = np.diag(self.confusionMatrix).sum() / self.confusionMatrix.sum() | |
| return acc | |
| def lineAccuracy(self): | |
| Acc = np.diag(self.confusionMatrix) / (self.confusionMatrix.sum(axis=1) + 1e-12) | |
| return Acc[1] | |
| def classPixelAccuracy(self): | |
| # return each category pixel accuracy(A more accurate way to call it precision) | |
| # acc = (TP) / TP + FP | |
| classAcc = np.diag(self.confusionMatrix) / (self.confusionMatrix.sum(axis=0) + 1e-12) | |
| return classAcc | |
| def meanPixelAccuracy(self): | |
| classAcc = self.classPixelAccuracy() | |
| meanAcc = np.nanmean(classAcc) | |
| return meanAcc | |
| def meanIntersectionOverUnion(self): | |
| # Intersection = TP Union = TP + FP + FN | |
| # IoU = TP / (TP + FP + FN) | |
| intersection = np.diag(self.confusionMatrix) | |
| union = np.sum(self.confusionMatrix, axis=1) + np.sum(self.confusionMatrix, axis=0) - np.diag(self.confusionMatrix) | |
| IoU = intersection / union | |
| IoU[np.isnan(IoU)] = 0 | |
| mIoU = np.nanmean(IoU) | |
| return mIoU | |
| def IntersectionOverUnion(self): | |
| intersection = np.diag(self.confusionMatrix) | |
| union = np.sum(self.confusionMatrix, axis=1) + np.sum(self.confusionMatrix, axis=0) - np.diag(self.confusionMatrix) | |
| IoU = intersection / union | |
| IoU[np.isnan(IoU)] = 0 | |
| return IoU[1] | |
| def genConfusionMatrix(self, imgPredict, imgLabel): | |
| # remove classes from unlabeled pixels in gt image and predict | |
| # print(imgLabel.shape) | |
| mask = (imgLabel >= 0) & (imgLabel < self.numClass) | |
| label = self.numClass * imgLabel[mask] + imgPredict[mask] | |
| count = np.bincount(label, minlength=self.numClass**2) | |
| confusionMatrix = count.reshape(self.numClass, self.numClass) | |
| return confusionMatrix | |
| def Frequency_Weighted_Intersection_over_Union(self): | |
| # FWIOU = [(TP+FN)/(TP+FP+TN+FN)] *[TP / (TP + FP + FN)] | |
| freq = np.sum(self.confusionMatrix, axis=1) / np.sum(self.confusionMatrix) | |
| iu = np.diag(self.confusionMatrix) / ( | |
| np.sum(self.confusionMatrix, axis=1) + np.sum(self.confusionMatrix, axis=0) - | |
| np.diag(self.confusionMatrix)) | |
| FWIoU = (freq[freq > 0] * iu[freq > 0]).sum() | |
| return FWIoU | |
| def addBatch(self, imgPredict, imgLabel): | |
| assert imgPredict.shape == imgLabel.shape | |
| self.confusionMatrix += self.genConfusionMatrix(imgPredict, imgLabel) | |
| def reset(self): | |
| self.confusionMatrix = np.zeros((self.numClass, self.numClass)) | |
| class AverageMeter(object): | |
| """Computes and stores the average and current value""" | |
| def __init__(self): | |
| self.reset() | |
| def reset(self): | |
| self.val = 0 | |
| self.avg = 0 | |
| self.sum = 0 | |
| self.count = 0 | |
| def update(self, val, n=1): | |
| self.val = val | |
| self.sum += val * n | |
| self.count += n | |
| self.avg = self.sum / self.count if self.count != 0 else 0 | |
| def _make_grid(nx=20, ny=20): | |
| yv, xv = torch.meshgrid([torch.arange(ny), torch.arange(nx)]) | |
| return torch.stack((xv, yv), 2).view((1, 1, ny, nx, 2)).float() | |
| def split_for_trace_model(pred = None, anchor_grid = None): | |
| z = [] | |
| st = [8,16,32] | |
| for i in range(3): | |
| bs, _, ny, nx = pred[i].shape | |
| pred[i] = pred[i].view(bs, 3, 85, ny, nx).permute(0, 1, 3, 4, 2).contiguous() | |
| y = pred[i].sigmoid() | |
| gr = _make_grid(nx, ny).to(pred[i].device) | |
| y[..., 0:2] = (y[..., 0:2] * 2. - 0.5 + gr) * st[i] # xy | |
| y[..., 2:4] = (y[..., 2:4] * 2) ** 2 * anchor_grid[i] # wh | |
| z.append(y.view(bs, -1, 85)) | |
| pred = torch.cat(z, 1) | |
| return pred | |
| def show_seg_result(img, result, palette=None,is_demo=False): | |
| if palette is None: | |
| palette = np.random.randint( | |
| 0, 255, size=(3, 3)) | |
| palette[0] = [0, 0, 0] | |
| palette[1] = [0, 255, 0] | |
| palette[2] = [255, 0, 0] | |
| palette = np.array(palette) | |
| assert palette.shape[0] == 3 # len(classes) | |
| assert palette.shape[1] == 3 | |
| assert len(palette.shape) == 2 | |
| if not is_demo: | |
| color_seg = np.zeros((result.shape[0], result.shape[1], 3), dtype=np.uint8) | |
| for label, color in enumerate(palette): | |
| color_seg[result == label, :] = color | |
| else: | |
| color_area = np.zeros((result[0].shape[0], result[0].shape[1], 3), dtype=np.uint8) | |
| color_area[result[0] == 1] = [0, 255, 0] | |
| color_area[result[1] ==1] = [255, 0, 0] | |
| color_seg = color_area | |
| # convert to BGR | |
| color_seg = color_seg[..., ::-1] | |
| # print(color_seg.shape) | |
| color_mask = np.mean(color_seg, 2) | |
| img[color_mask != 0] = img[color_mask != 0] * 0.5 + color_seg[color_mask != 0] * 0.5 | |
| # img = img * 0.5 + color_seg * 0.5 | |
| #img = img.astype(np.uint8) | |
| #img = cv2.resize(img, (1280,720), interpolation=cv2.INTER_LINEAR) | |
| return | |
| def increment_path(path, exist_ok=True, sep=''): | |
| # Increment path, i.e. runs/exp --> runs/exp{sep}0, runs/exp{sep}1 etc. | |
| path = Path(path) # os-agnostic | |
| if (path.exists() and exist_ok) or (not path.exists()): | |
| return str(path) | |
| else: | |
| dirs = glob.glob(f"{path}{sep}*") # similar paths | |
| matches = [re.search(rf"%s{sep}(\d+)" % path.stem, d) for d in dirs] | |
| i = [int(m.groups()[0]) for m in matches if m] # indices | |
| n = max(i) + 1 if i else 2 # increment number | |
| return f"{path}{sep}{n}" # update path | |
| def scale_coords(img1_shape, coords, img0_shape, ratio_pad=None): | |
| # Rescale coords (xyxy) from img1_shape to img0_shape | |
| if ratio_pad is None: # calculate from img0_shape | |
| gain = min(img1_shape[0] / img0_shape[0], img1_shape[1] / img0_shape[1]) # gain = old / new | |
| pad = (img1_shape[1] - img0_shape[1] * gain) / 2, (img1_shape[0] - img0_shape[0] * gain) / 2 # wh padding | |
| else: | |
| gain = ratio_pad[0][0] | |
| pad = ratio_pad[1] | |
| coords[:, [0, 2]] -= pad[0] # x padding | |
| coords[:, [1, 3]] -= pad[1] # y padding | |
| coords[:, :4] /= gain | |
| clip_coords(coords, img0_shape) | |
| return coords | |
| def clip_coords(boxes, img_shape): | |
| # Clip bounding xyxy bounding boxes to image shape (height, width) | |
| boxes[:, 0].clamp_(0, img_shape[1]) # x1 | |
| boxes[:, 1].clamp_(0, img_shape[0]) # y1 | |
| boxes[:, 2].clamp_(0, img_shape[1]) # x2 | |
| boxes[:, 3].clamp_(0, img_shape[0]) # y2 | |
| def set_logging(rank=-1): | |
| logging.basicConfig( | |
| format="%(message)s", | |
| level=logging.INFO if rank in [-1, 0] else logging.WARN) | |
| def xywh2xyxy(x): | |
| # Convert nx4 boxes from [x, y, w, h] to [x1, y1, x2, y2] where xy1=top-left, xy2=bottom-right | |
| y = x.clone() if isinstance(x, torch.Tensor) else np.copy(x) | |
| y[:, 0] = x[:, 0] - x[:, 2] / 2 # top left x | |
| y[:, 1] = x[:, 1] - x[:, 3] / 2 # top left y | |
| y[:, 2] = x[:, 0] + x[:, 2] / 2 # bottom right x | |
| y[:, 3] = x[:, 1] + x[:, 3] / 2 # bottom right y | |
| return y | |
| def xyxy2xywh(x): | |
| # Convert nx4 boxes from [x1, y1, x2, y2] to [x, y, w, h] where xy1=top-left, xy2=bottom-right | |
| y = x.clone() if isinstance(x, torch.Tensor) else np.copy(x) | |
| y[:, 0] = (x[:, 0] + x[:, 2]) / 2 # x center | |
| y[:, 1] = (x[:, 1] + x[:, 3]) / 2 # y center | |
| y[:, 2] = x[:, 2] - x[:, 0] # width | |
| y[:, 3] = x[:, 3] - x[:, 1] # height | |
| return y | |
| def non_max_suppression(prediction, conf_thres=0.25, iou_thres=0.45, classes=None, agnostic=False, multi_label=False, | |
| labels=()): | |
| """Runs Non-Maximum Suppression (NMS) on inference results | |
| Returns: | |
| list of detections, on (n,6) tensor per image [xyxy, conf, cls] | |
| """ | |
| nc = prediction.shape[2] - 5 # number of classes | |
| xc = prediction[..., 4] > conf_thres # candidates | |
| # Settings | |
| min_wh, max_wh = 2, 4096 # (pixels) minimum and maximum box width and height | |
| max_det = 300 # maximum number of detections per image | |
| max_nms = 30000 # maximum number of boxes into torchvision.ops.nms() | |
| time_limit = 10.0 # seconds to quit after | |
| redundant = True # require redundant detections | |
| multi_label &= nc > 1 # multiple labels per box (adds 0.5ms/img) | |
| merge = False # use merge-NMS | |
| t = time.time() | |
| output = [torch.zeros((0, 6), device=prediction.device)] * prediction.shape[0] | |
| for xi, x in enumerate(prediction): # image index, image inference | |
| # Apply constraints | |
| # x[((x[..., 2:4] < min_wh) | (x[..., 2:4] > max_wh)).any(1), 4] = 0 # width-height | |
| x = x[xc[xi]] # confidence | |
| # Cat apriori labels if autolabelling | |
| if labels and len(labels[xi]): | |
| l = labels[xi] | |
| v = torch.zeros((len(l), nc + 5), device=x.device) | |
| v[:, :4] = l[:, 1:5] # box | |
| v[:, 4] = 1.0 # conf | |
| v[range(len(l)), l[:, 0].long() + 5] = 1.0 # cls | |
| x = torch.cat((x, v), 0) | |
| # If none remain process next image | |
| if not x.shape[0]: | |
| continue | |
| # Compute conf | |
| x[:, 5:] *= x[:, 4:5] # conf = obj_conf * cls_conf | |
| # Box (center x, center y, width, height) to (x1, y1, x2, y2) | |
| box = xywh2xyxy(x[:, :4]) | |
| # Detections matrix nx6 (xyxy, conf, cls) | |
| if multi_label: | |
| i, j = (x[:, 5:] > conf_thres).nonzero(as_tuple=False).T | |
| x = torch.cat((box[i], x[i, j + 5, None], j[:, None].float()), 1) | |
| else: # best class only | |
| conf, j = x[:, 5:].max(1, keepdim=True) | |
| x = torch.cat((box, conf, j.float()), 1)[conf.view(-1) > conf_thres] | |
| # Filter by class | |
| if classes is not None: | |
| x = x[(x[:, 5:6] == torch.tensor(classes, device=x.device)).any(1)] | |
| # Apply finite constraint | |
| # if not torch.isfinite(x).all(): | |
| # x = x[torch.isfinite(x).all(1)] | |
| # Check shape | |
| n = x.shape[0] # number of boxes | |
| if not n: # no boxes | |
| continue | |
| elif n > max_nms: # excess boxes | |
| x = x[x[:, 4].argsort(descending=True)[:max_nms]] # sort by confidence | |
| # Batched NMS | |
| c = x[:, 5:6] * (0 if agnostic else max_wh) # classes | |
| boxes, scores = x[:, :4] + c, x[:, 4] # boxes (offset by class), scores | |
| i = torchvision.ops.nms(boxes, scores, iou_thres) # NMS | |
| if i.shape[0] > max_det: # limit detections | |
| i = i[:max_det] | |
| if merge and (1 < n < 3E3): # Merge NMS (boxes merged using weighted mean) | |
| # update boxes as boxes(i,4) = weights(i,n) * boxes(n,4) | |
| iou = box_iou(boxes[i], boxes) > iou_thres # iou matrix | |
| weights = iou * scores[None] # box weights | |
| x[i, :4] = torch.mm(weights, x[:, :4]).float() / weights.sum(1, keepdim=True) # merged boxes | |
| if redundant: | |
| i = i[iou.sum(1) > 1] # require redundancy | |
| output[xi] = x[i] | |
| if (time.time() - t) > time_limit: | |
| print(f'WARNING: NMS time limit {time_limit}s exceeded') | |
| break # time limit exceeded | |
| return output | |
| def box_iou(box1, box2): | |
| # https://github.com/pytorch/vision/blob/master/torchvision/ops/boxes.py | |
| """ | |
| Return intersection-over-union (Jaccard index) of boxes. | |
| Both sets of boxes are expected to be in (x1, y1, x2, y2) format. | |
| Arguments: | |
| box1 (Tensor[N, 4]) | |
| box2 (Tensor[M, 4]) | |
| Returns: | |
| iou (Tensor[N, M]): the NxM matrix containing the pairwise | |
| IoU values for every element in boxes1 and boxes2 | |
| """ | |
| def box_area(box): | |
| # box = 4xn | |
| return (box[2] - box[0]) * (box[3] - box[1]) | |
| area1 = box_area(box1.T) | |
| area2 = box_area(box2.T) | |
| # inter(N,M) = (rb(N,M,2) - lt(N,M,2)).clamp(0).prod(2) | |
| inter = (torch.min(box1[:, None, 2:], box2[:, 2:]) - torch.max(box1[:, None, :2], box2[:, :2])).clamp(0).prod(2) | |
| return inter / (area1[:, None] + area2 - inter) # iou = inter / (area1 + area2 - inter) | |
| class LoadImages: # for inference | |
| def __init__(self, path, img_size=640, stride=32): | |
| p = str(Path(path).absolute()) # os-agnostic absolute path | |
| if '*' in p: | |
| files = sorted(glob.glob(p, recursive=True)) # glob | |
| elif os.path.isdir(p): | |
| files = sorted(glob.glob(os.path.join(p, '*.*'))) # dir | |
| elif os.path.isfile(p): | |
| files = [p] # files | |
| else: | |
| raise Exception(f'ERROR: {p} does not exist') | |
| img_formats = ['bmp', 'jpg', 'jpeg', 'png', 'tif', 'tiff', 'dng', 'webp', 'mpo'] # acceptable image suffixes | |
| vid_formats = ['mov', 'avi', 'mp4', 'mpg', 'mpeg', 'm4v', 'wmv', 'mkv'] # acceptable video suffixes | |
| images = [x for x in files if x.split('.')[-1].lower() in img_formats] | |
| videos = [x for x in files if x.split('.')[-1].lower() in vid_formats] | |
| ni, nv = len(images), len(videos) | |
| self.img_size = img_size | |
| self.stride = stride | |
| self.files = images + videos | |
| self.nf = ni + nv # number of files | |
| self.video_flag = [False] * ni + [True] * nv | |
| self.mode = 'image' | |
| if any(videos): | |
| self.new_video(videos[0]) # new video | |
| else: | |
| self.cap = None | |
| assert self.nf > 0, f'No images or videos found in {p}. ' \ | |
| f'Supported formats are:\nimages: {img_formats}\nvideos: {vid_formats}' | |
| def __iter__(self): | |
| self.count = 0 | |
| return self | |
| def __next__(self): | |
| if self.count == self.nf: | |
| raise StopIteration | |
| path = self.files[self.count] | |
| if self.video_flag[self.count]: | |
| # Read video | |
| self.mode = 'video' | |
| ret_val, img0 = self.cap.read() | |
| if not ret_val: | |
| self.count += 1 | |
| self.cap.release() | |
| if self.count == self.nf: # last video | |
| raise StopIteration | |
| else: | |
| path = self.files[self.count] | |
| self.new_video(path) | |
| ret_val, img0 = self.cap.read() | |
| self.frame += 1 | |
| print(f'video {self.count + 1}/{self.nf} ({self.frame}/{self.nframes}) {path}: ', end='') | |
| else: | |
| # Read image | |
| self.count += 1 | |
| img0 = cv2.imread(path) # BGR | |
| assert img0 is not None, 'Image Not Found ' + path | |
| #print(f'image {self.count}/{self.nf} {path}: ', end='') | |
| # Padded resize | |
| img0 = cv2.resize(img0, (1280,720), interpolation=cv2.INTER_LINEAR) | |
| img = letterbox(img0, self.img_size, stride=self.stride)[0] | |
| # Convert | |
| img = img[:, :, ::-1].transpose(2, 0, 1) # BGR to RGB, to 3x416x416 | |
| img = np.ascontiguousarray(img) | |
| return path, img, img0, self.cap | |
| def new_video(self, path): | |
| self.frame = 0 | |
| self.cap = cv2.VideoCapture(path) | |
| self.nframes = int(self.cap.get(cv2.CAP_PROP_FRAME_COUNT)) | |
| def __len__(self): | |
| return self.nf # number of files | |
| def letterbox(img, new_shape=(640, 640), color=(114, 114, 114), auto=True, scaleFill=False, scaleup=True, stride=32): | |
| # Resize and pad image while meeting stride-multiple constraints | |
| shape = img.shape[:2] # current shape [height, width] | |
| if isinstance(new_shape, int): | |
| new_shape = (new_shape, new_shape) | |
| #print(sem_img.shape) | |
| # Scale ratio (new / old) | |
| r = min(new_shape[0] / shape[0], new_shape[1] / shape[1]) | |
| if not scaleup: # only scale down, do not scale up (for better test mAP) | |
| r = min(r, 1.0) | |
| # Compute padding | |
| ratio = r, r # width, height ratios | |
| new_unpad = int(round(shape[1] * r)), int(round(shape[0] * r)) | |
| dw, dh = new_shape[1] - new_unpad[0], new_shape[0] - new_unpad[1] # wh padding | |
| if auto: # minimum rectangle | |
| dw, dh = np.mod(dw, stride), np.mod(dh, stride) # wh padding | |
| elif scaleFill: # stretch | |
| dw, dh = 0.0, 0.0 | |
| new_unpad = (new_shape[1], new_shape[0]) | |
| ratio = new_shape[1] / shape[1], new_shape[0] / shape[0] # width, height ratios | |
| dw /= 2 # divide padding into 2 sides | |
| dh /= 2 | |
| if shape[::-1] != new_unpad: # resize | |
| img = cv2.resize(img, new_unpad, interpolation=cv2.INTER_LINEAR) | |
| top, bottom = int(round(dh - 0.1)), int(round(dh + 0.1)) | |
| left, right = int(round(dw - 0.1)), int(round(dw + 0.1)) | |
| img = cv2.copyMakeBorder(img, top, bottom, left, right, cv2.BORDER_CONSTANT, value=color) # add border | |
| return img, ratio, (dw, dh) | |
| def driving_area_mask(seg = None): | |
| da_predict = seg[:, :, 12:372,:] | |
| da_seg_mask = torch.nn.functional.interpolate(da_predict, scale_factor=2, mode='bilinear') | |
| _, da_seg_mask = torch.max(da_seg_mask, 1) | |
| da_seg_mask = da_seg_mask.int().squeeze().cpu().numpy() | |
| return da_seg_mask | |
| def lane_line_mask(ll = None): | |
| ll_predict = ll[:, :, 12:372,:] | |
| ll_seg_mask = torch.nn.functional.interpolate(ll_predict, scale_factor=2, mode='bilinear') | |
| ll_seg_mask = torch.round(ll_seg_mask).squeeze(1) | |
| ll_seg_mask = ll_seg_mask.int().squeeze().cpu().numpy() | |
| return ll_seg_mask | |