|
|
import datetime |
|
|
import os |
|
|
import platform |
|
|
import random |
|
|
import subprocess |
|
|
import time |
|
|
from pathlib import Path |
|
|
|
|
|
import cv2 |
|
|
import numpy as np |
|
|
import torch |
|
|
import torchvision |
|
|
|
|
|
|
|
|
def box_iou(box1, box2): |
|
|
|
|
|
""" |
|
|
Return intersection-over-union (Jaccard index) of boxes. |
|
|
Both sets of boxes are expected to be in (x1, y1, x2, y2) format. |
|
|
Arguments: |
|
|
box1 (Tensor[N, 4]) |
|
|
box2 (Tensor[M, 4]) |
|
|
Returns: |
|
|
iou (Tensor[N, M]): the NxM matrix containing the pairwise |
|
|
IoU values for every element in boxes1 and boxes2 |
|
|
""" |
|
|
|
|
|
def box_area(box): |
|
|
|
|
|
return (box[2] - box[0]) * (box[3] - box[1]) |
|
|
|
|
|
area1 = box_area(box1.T) |
|
|
area2 = box_area(box2.T) |
|
|
|
|
|
|
|
|
inter = (torch.min(box1[:, None, 2:], box2[:, 2:]) - torch.max(box1[:, None, :2], box2[:, :2])).clamp(0).prod(2) |
|
|
return inter / (area1[:, None] + area2 - inter) |
|
|
|
|
|
|
|
|
def xywh2xyxy(x): |
|
|
|
|
|
y = x.clone() if isinstance(x, torch.Tensor) else np.copy(x) |
|
|
y[:, 0] = x[:, 0] - x[:, 2] / 2 |
|
|
y[:, 1] = x[:, 1] - x[:, 3] / 2 |
|
|
y[:, 2] = x[:, 0] + x[:, 2] / 2 |
|
|
y[:, 3] = x[:, 1] + x[:, 3] / 2 |
|
|
return y |
|
|
|
|
|
|
|
|
def letterbox_for_img(img, new_shape=(640, 640), color=(114, 114, 114), auto=True, scaleFill=False, scaleup=True): |
|
|
|
|
|
shape = img.shape[:2] |
|
|
if isinstance(new_shape, int): |
|
|
new_shape = (new_shape, new_shape) |
|
|
|
|
|
|
|
|
r = min(new_shape[0] / shape[0], new_shape[1] / shape[1]) |
|
|
if not scaleup: |
|
|
r = min(r, 1.0) |
|
|
|
|
|
|
|
|
ratio = r, r |
|
|
new_unpad = int(round(shape[1] * r)), int(round(shape[0] * r)) |
|
|
dw, dh = new_shape[1] - new_unpad[0], new_shape[0] - new_unpad[1] |
|
|
if auto: |
|
|
dw, dh = np.mod(dw, 32), np.mod(dh, 32) |
|
|
elif scaleFill: |
|
|
dw, dh = 0.0, 0.0 |
|
|
new_unpad = (new_shape[1], new_shape[0]) |
|
|
ratio = new_shape[1] / shape[1], new_shape[0] / shape[0] |
|
|
|
|
|
dw /= 2 |
|
|
dh /= 2 |
|
|
if shape[::-1] != new_unpad: |
|
|
img = cv2.resize(img, new_unpad, interpolation=cv2.INTER_AREA) |
|
|
|
|
|
top, bottom = int(round(dh - 0.1)), int(round(dh + 0.1)) |
|
|
left, right = int(round(dw - 0.1)), int(round(dw + 0.1)) |
|
|
img = cv2.copyMakeBorder(img, top, bottom, left, right, cv2.BORDER_CONSTANT, value=color) |
|
|
return img, ratio, (dw, dh) |
|
|
|
|
|
|
|
|
def non_max_suppression(prediction, conf_thres=0.25, iou_thres=0.45, classes=None, agnostic=False, multi_label=False, |
|
|
labels=()): |
|
|
"""Runs Non-Maximum Suppression (NMS) on inference results |
|
|
|
|
|
Returns: |
|
|
list of detections, on (n,6) tensor per image [xyxy, conf, cls] |
|
|
""" |
|
|
|
|
|
nc = prediction.shape[2] - 5 |
|
|
xc = prediction[..., 4] > conf_thres |
|
|
|
|
|
|
|
|
min_wh, max_wh = 2, 4096 |
|
|
max_det = 300 |
|
|
max_nms = 30000 |
|
|
time_limit = 10.0 |
|
|
redundant = True |
|
|
multi_label &= nc > 1 |
|
|
merge = False |
|
|
|
|
|
t = time.time() |
|
|
output = [torch.zeros((0, 6), device=prediction.device)] * prediction.shape[0] |
|
|
for xi, x in enumerate(prediction): |
|
|
|
|
|
|
|
|
x = x[xc[xi]] |
|
|
|
|
|
|
|
|
if labels and len(labels[xi]): |
|
|
l = labels[xi] |
|
|
v = torch.zeros((len(l), nc + 5), device=x.device) |
|
|
v[:, :4] = l[:, 1:5] |
|
|
v[:, 4] = 1.0 |
|
|
v[range(len(l)), l[:, 0].long() + 5] = 1.0 |
|
|
x = torch.cat((x, v), 0) |
|
|
|
|
|
|
|
|
if not x.shape[0]: |
|
|
continue |
|
|
|
|
|
|
|
|
if nc == 1: |
|
|
x[:, 5:] = x[:, 4:5] |
|
|
|
|
|
else: |
|
|
x[:, 5:] *= x[:, 4:5] |
|
|
|
|
|
|
|
|
box = xywh2xyxy(x[:, :4]) |
|
|
|
|
|
|
|
|
if multi_label: |
|
|
i, j = (x[:, 5:] > conf_thres).nonzero(as_tuple=False).T |
|
|
x = torch.cat((box[i], x[i, j + 5, None], j[:, None].float()), 1) |
|
|
else: |
|
|
conf, j = x[:, 5:].max(1, keepdim=True) |
|
|
x = torch.cat((box, conf, j.float()), 1)[conf.view(-1) > conf_thres] |
|
|
|
|
|
|
|
|
if classes is not None: |
|
|
x = x[(x[:, 5:6] == torch.tensor(classes, device=x.device)).any(1)] |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
n = x.shape[0] |
|
|
if not n: |
|
|
continue |
|
|
elif n > max_nms: |
|
|
x = x[x[:, 4].argsort(descending=True)[:max_nms]] |
|
|
|
|
|
|
|
|
c = x[:, 5:6] * (0 if agnostic else max_wh) |
|
|
boxes, scores = x[:, :4] + c, x[:, 4] |
|
|
i = torchvision.ops.nms(boxes, scores, iou_thres) |
|
|
if i.shape[0] > max_det: |
|
|
i = i[:max_det] |
|
|
if merge and (1 < n < 3E3): |
|
|
|
|
|
iou = box_iou(boxes[i], boxes) > iou_thres |
|
|
weights = iou * scores[None] |
|
|
x[i, :4] = torch.mm(weights, x[:, :4]).float() / weights.sum(1, keepdim=True) |
|
|
if redundant: |
|
|
i = i[iou.sum(1) > 1] |
|
|
|
|
|
output[xi] = x[i] |
|
|
if (time.time() - t) > time_limit: |
|
|
print(f'WARNING: NMS time limit {time_limit}s exceeded') |
|
|
break |
|
|
|
|
|
return output |
|
|
|
|
|
|
|
|
def scale_coords(img1_shape, coords, img0_shape, ratio_pad=None): |
|
|
|
|
|
if ratio_pad is None: |
|
|
gain = min(img1_shape[0] / img0_shape[0], img1_shape[1] / img0_shape[1]) |
|
|
pad = (img1_shape[1] - img0_shape[1] * gain) / 2, (img1_shape[0] - img0_shape[0] * gain) / 2 |
|
|
else: |
|
|
gain = ratio_pad[0][0] |
|
|
pad = ratio_pad[1] |
|
|
|
|
|
coords[:, [0, 2]] -= pad[0] |
|
|
coords[:, [1, 3]] -= pad[1] |
|
|
coords[:, :4] /= gain |
|
|
clip_coords(coords, img0_shape) |
|
|
return coords |
|
|
|
|
|
|
|
|
def clip_coords(boxes, img_shape): |
|
|
|
|
|
boxes[:, 0].clamp_(0, img_shape[1]) |
|
|
boxes[:, 1].clamp_(0, img_shape[0]) |
|
|
boxes[:, 2].clamp_(0, img_shape[1]) |
|
|
boxes[:, 3].clamp_(0, img_shape[0]) |
|
|
|
|
|
|
|
|
def plot_one_box(x, img, color=None, label=None, line_thickness=3): |
|
|
|
|
|
tl = line_thickness or round(0.002 * (img.shape[0] + img.shape[1]) / 2) + 1 |
|
|
color = color if color else [random.randint(0, 255) for _ in range(3)] |
|
|
c1, c2 = (int(x[0]), int(x[1])), (int(x[2]), int(x[3])) |
|
|
cv2.rectangle(img, c1, c2, color, thickness=tl, lineType=cv2.LINE_AA) |
|
|
|
|
|
blk = np.zeros(img.shape, np.uint8) |
|
|
cv2.rectangle(blk, c1, c2, [255, 255, 0], -1, lineType=cv2.LINE_AA) |
|
|
img = cv2.addWeighted(img, 1.0, blk, 0.00, 1) |
|
|
if label: |
|
|
tf = max(tl - 1, 1) |
|
|
t_size = cv2.getTextSize(label, 0, fontScale=tl / 3, thickness=tf)[0] |
|
|
c2 = c1[0] + t_size[0], c1[1] - t_size[1] - 3 |
|
|
cv2.rectangle(img, c1, c2, color, -1, cv2.LINE_AA) |
|
|
cv2.putText(img, label, (c1[0], c1[1] - 2), 0, tl / 3, [0, 0, 0], thickness=tf, lineType=cv2.LINE_AA) |
|
|
return img |
|
|
|
|
|
|
|
|
def driving_area_mask(da_seg_out, width, height, pad_w, pad_h, ratio): |
|
|
da_predict = da_seg_out[:, :, pad_h:(height - pad_h), pad_w:(width - pad_w)] |
|
|
da_seg_mask = torch.nn.functional.interpolate(da_predict, scale_factor=int(1 / ratio), mode='bilinear') |
|
|
_, da_seg_mask = torch.max(da_seg_mask, 1) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
da_seg_mask = da_seg_mask.int().squeeze().cpu().numpy() |
|
|
return da_seg_mask |
|
|
|
|
|
|
|
|
def lane_line_mask(ll_seg_out, width, height, pad_w, pad_h, ratio): |
|
|
ll_seg_mask = ll_seg_out[:, :1, pad_h:(height - pad_h), pad_w:(width - pad_w)] |
|
|
ll_seg_mask = torch.nn.functional.interpolate(ll_seg_mask, scale_factor=int(1 / ratio), mode='bilinear') |
|
|
ll_seg_mask = torch.where(ll_seg_mask > 0.90, 0, 1) |
|
|
ll_seg_mask = ll_seg_mask.int().squeeze().cpu().numpy() |
|
|
return ll_seg_mask |
|
|
|
|
|
|
|
|
def show_seg_result(img, result, index=0, epoch=0, batch=0, save_dir=None, is_ll=False, palette=None, is_demo=False, |
|
|
is_gt=False, color=None, alpha_da=0.5, alpha_ll=0.5): |
|
|
if palette is None: |
|
|
palette = np.random.randint(0, 255, size=(3, 3)) |
|
|
palette[0] = [0, 0, 0] |
|
|
palette[1] = [0, 255, 0] |
|
|
palette[2] = [255, 0, 0] |
|
|
palette = np.array(palette) |
|
|
assert palette.shape[0] == 3 |
|
|
assert palette.shape[1] == 3 |
|
|
assert len(palette.shape) == 2 |
|
|
|
|
|
if not is_demo: |
|
|
color_seg = np.zeros((result.shape[0], result.shape[1], 3), dtype=np.uint8) |
|
|
for label, color in enumerate(palette): |
|
|
color_seg[result == label, :] = color |
|
|
color_seg = color_seg[..., ::-1] |
|
|
color_mask = np.mean(color_seg, 2) |
|
|
img[color_mask != 0] = img[color_mask != 0] * 0.5 + color_seg[color_mask != 0] * 0.5 |
|
|
else: |
|
|
color_da = np.zeros((result[0].shape[0], result[0].shape[1], 3), dtype=np.uint8) |
|
|
color_ll = np.zeros((result[0].shape[0], result[0].shape[1], 3), dtype=np.uint8) |
|
|
if color is not None: |
|
|
color_da[result[0] == 1] = color[1] |
|
|
color_ll[result[1] == 1] = color[2] |
|
|
else: |
|
|
color_da[result[0] == 1] = [0, 255, 0] |
|
|
color_ll[result[1] == 1] = [255, 0, 0] |
|
|
|
|
|
|
|
|
color_da = color_da[..., ::-1] |
|
|
color_ll = color_ll[..., ::-1] |
|
|
|
|
|
color_mask_da = np.mean(color_da, 2) |
|
|
color_mask_ll = np.mean(color_ll, 2) |
|
|
|
|
|
img[color_mask_da != 0] = img[color_mask_da != 0] * (1 - alpha_da) + color_da[color_mask_da != 0] * alpha_da |
|
|
img[color_mask_ll != 0] = img[color_mask_ll != 0] * (1 - alpha_ll) + color_ll[color_mask_ll != 0] * alpha_ll |
|
|
|
|
|
return img |
|
|
|
|
|
|
|
|
def git_describe(path=Path(__file__).parent): |
|
|
|
|
|
s = f'git -C {path} describe --tags --long --always' |
|
|
try: |
|
|
return subprocess.check_output(s, shell=True, stderr=subprocess.STDOUT).decode()[:-1] |
|
|
except subprocess.CalledProcessError as e: |
|
|
return '' |
|
|
|
|
|
|
|
|
def date_modified(path=__file__): |
|
|
|
|
|
t = datetime.datetime.fromtimestamp(Path(path).stat().st_mtime) |
|
|
return f'{t.year}-{t.month}-{t.day}' |
|
|
|
|
|
|
|
|
def select_device(logger=None, device='', batch_size=None): |
|
|
|
|
|
s = f'mtpnet 🚀 {git_describe() or date_modified()} torch {torch.__version__} ' |
|
|
cpu = device.lower() == 'cpu' |
|
|
if cpu: |
|
|
os.environ['CUDA_VISIBLE_DEVICES'] = '-1' |
|
|
elif device: |
|
|
os.environ['CUDA_VISIBLE_DEVICES'] = device |
|
|
assert torch.cuda.is_available(), f'CUDA unavailable, invalid device {device} requested' |
|
|
|
|
|
cuda = not cpu and torch.cuda.is_available() |
|
|
if cuda: |
|
|
n = torch.cuda.device_count() |
|
|
if n > 1 and batch_size: |
|
|
assert batch_size % n == 0, f'batch-size {batch_size} not multiple of GPU count {n}' |
|
|
space = ' ' * len(s) |
|
|
for i, d in enumerate(device.split(',') if device else range(n)): |
|
|
p = torch.cuda.get_device_properties(i) |
|
|
s += f"{'' if i == 0 else space}CUDA:{d} ({p.name}, {p.total_memory / 1024 ** 2}MB)\n" |
|
|
else: |
|
|
s += 'CPU\n' |
|
|
|
|
|
if logger: |
|
|
logger.info(s.encode().decode('ascii', 'ignore') if platform.system() == 'Windows' else s) |
|
|
return torch.device('cuda:0' if cuda else 'cpu') |
|
|
|
|
|
|
|
|
class AverageMeter(object): |
|
|
"""Computes and stores the average and current value""" |
|
|
|
|
|
def __init__(self): |
|
|
self.val = 0. |
|
|
self.avg = 0. |
|
|
self.sum = 0. |
|
|
self.count = 0. |
|
|
|
|
|
def reset(self): |
|
|
self.val = 0. |
|
|
self.avg = 0. |
|
|
self.sum = 0. |
|
|
self.count = 0. |
|
|
|
|
|
def update(self, val, n=1): |
|
|
self.val = val |
|
|
self.sum += val * n |
|
|
self.count += n |
|
|
self.avg = self.sum / self.count if self.count != 0 else 0 |
|
|
|
|
|
|
|
|
def time_synchronized(): |
|
|
|
|
|
if torch.cuda.is_available(): |
|
|
torch.cuda.synchronize() |
|
|
return time.time() |