mtpnet / yolop_demo.py
erlinersi's picture
Update yolop_demo.py
a11322c
import argparse
import glob
import math
import os
import random
import time
from pathlib import Path
import cv2
import numpy as np
import torch
import torch.nn.functional as F
from PIL import Image
from torch import nn
from torch.nn import Upsample
from torchvision import transforms
from tqdm import tqdm
from utils.utils_demo import select_device, AverageMeter, time_synchronized, non_max_suppression
# os.system("wget https://github.com/hustvl/YOLOP/raw/main/weights/End-to-end.pth")
def autopad(k, p=None): # kernel, padding
# Pad to 'same'
if p is None:
p = k // 2 if isinstance(k, int) else [x // 2 for x in k] # auto-pad
return p
class Hardswish(nn.Module): # export-friendly version of nn.Hardswish()
@staticmethod
def forward(x):
# return x * F.hardsigmoid(x) # for torchscript and CoreML
return x * F.hardtanh(x + 3, 0., 6.) / 6. # for torchscript, CoreML and ONNX
class Conv(nn.Module):
# Standard convolution
def __init__(self, c1, c2, k=1, s=1, p=None, g=1, act=True): # ch_in, ch_out, kernel, stride, padding, groups
super(Conv, self).__init__()
self.conv = nn.Conv2d(c1, c2, k, s, autopad(k, p), groups=g, bias=False)
self.bn = nn.BatchNorm2d(c2)
try:
self.act = Hardswish() if act else nn.Identity()
except:
self.act = nn.Identity()
def forward(self, x):
return self.act(self.bn(self.conv(x)))
def fuseforward(self, x):
return self.act(self.conv(x))
class Focus(nn.Module):
# Focus wh information into c-space
# slice concat conv
def __init__(self, c1, c2, k=1, s=1, p=None, g=1, act=True): # ch_in, ch_out, kernel, stride, padding, groups
super(Focus, self).__init__()
self.conv = Conv(c1 * 4, c2, k, s, p, g, act)
def forward(self, x): # x(b,c,w,h) -> y(b,4c,w/2,h/2)
return self.conv(torch.cat([x[..., ::2, ::2], x[..., 1::2, ::2], x[..., ::2, 1::2], x[..., 1::2, 1::2]], 1))
class BottleneckCSP(nn.Module):
# CSP Bottleneck https://github.com/WongKinYiu/CrossStagePartialNetworks
def __init__(self, c1, c2, n=1, shortcut=True, g=1, e=0.5): # ch_in, ch_out, number, shortcut, groups, expansion
super(BottleneckCSP, self).__init__()
c_ = int(c2 * e) # hidden channels
self.cv1 = Conv(c1, c_, 1, 1)
self.cv2 = nn.Conv2d(c1, c_, 1, 1, bias=False)
self.cv3 = nn.Conv2d(c_, c_, 1, 1, bias=False)
self.cv4 = Conv(2 * c_, c2, 1, 1)
self.bn = nn.BatchNorm2d(2 * c_) # applied to cat(cv2, cv3)
self.act = nn.LeakyReLU(0.1, inplace=True)
self.m = nn.Sequential(*[Bottleneck(c_, c_, shortcut, g, e=1.0) for _ in range(n)])
def forward(self, x):
y1 = self.cv3(self.m(self.cv1(x)))
y2 = self.cv2(x)
return self.cv4(self.act(self.bn(torch.cat((y1, y2), dim=1))))
class Bottleneck(nn.Module):
# Standard bottleneck
def __init__(self, c1, c2, shortcut=True, g=1, e=0.5): # ch_in, ch_out, shortcut, groups, expansion
super(Bottleneck, self).__init__()
c_ = int(c2 * e) # hidden channels
self.cv1 = Conv(c1, c_, 1, 1)
self.cv2 = Conv(c_, c2, 3, 1, g=g)
self.add = shortcut and c1 == c2
def forward(self, x):
return x + self.cv2(self.cv1(x)) if self.add else self.cv2(self.cv1(x))
class SPP(nn.Module):
# Spatial pyramid pooling layer used in YOLOv3-SPP
def __init__(self, c1, c2, k=(5, 9, 13)):
super(SPP, self).__init__()
c_ = c1 // 2 # hidden channels
self.cv1 = Conv(c1, c_, 1, 1)
self.cv2 = Conv(c_ * (len(k) + 1), c2, 1, 1)
self.m = nn.ModuleList([nn.MaxPool2d(kernel_size=x, stride=1, padding=x // 2) for x in k])
def forward(self, x):
x = self.cv1(x)
return self.cv2(torch.cat([x] + [m(x) for m in self.m], 1))
class Concat(nn.Module):
# Concatenate a list of tensors along dimension
def __init__(self, dimension=1):
super(Concat, self).__init__()
self.d = dimension
def forward(self, x):
""" print("***********************")
for f in x:
print(f.shape) """
return torch.cat(x, self.d)
class Detect(nn.Module):
stride = None # strides computed during build
def __init__(self, nc=13, anchors=(), ch=()): # detection layer
super(Detect, self).__init__()
self.nc = nc # number of classes
self.no = nc + 5 # number of outputs per anchor 85
self.nl = len(anchors) # number of detection layers 3
self.na = len(anchors[0]) // 2 # number of anchors 3
self.grid = [torch.zeros(1)] * self.nl # init grid
a = torch.tensor(anchors).float().view(self.nl, -1, 2)
self.register_buffer('anchors', a) # shape(nl,na,2)
self.register_buffer('anchor_grid', a.clone().view(self.nl, 1, -1, 1, 1, 2)) # shape(nl,1,na,1,1,2)
self.m = nn.ModuleList(nn.Conv2d(x, self.no * self.na, 1) for x in ch) # output conv
def forward(self, x):
z = [] # inference output
for i in range(self.nl):
x[i] = self.m[i](x[i]) # conv
# print(str(i)+str(x[i].shape))
bs, _, ny, nx = x[i].shape # x(bs,255,w,w) to x(bs,3,w,w,85)
x[i] = x[i].view(bs, self.na, self.no, ny * nx).permute(0, 1, 3, 2).view(bs, self.na, ny, nx,
self.no).contiguous()
# x[i] = x[i].view(bs, self.na, self.no, ny, nx).permute(0, 1, 3, 4, 2).contiguous()
# print(str(i)+str(x[i].shape))
if not self.training: # inference
if self.grid[i].shape[2:4] != x[i].shape[2:4]:
self.grid[i] = self._make_grid(nx, ny).to(x[i].device)
y = x[i].sigmoid()
# print("**")
# print(y.shape) #[1, 3, w, h, 85]
# print(self.grid[i].shape) #[1, 3, w, h, 2]
y[..., 0:2] = (y[..., 0:2] * 2. - 0.5 + self.grid[i].to(x[i].device)) * self.stride[i] # xy
y[..., 2:4] = (y[..., 2:4] * 2) ** 2 * self.anchor_grid[i] # wh
"""print("**")
print(y.shape) #[1, 3, w, h, 85]
print(y.view(bs, -1, self.no).shape) #[1, 3*w*h, 85]"""
z.append(y.view(bs, -1, self.no))
return x if self.training else (torch.cat(z, 1), x)
@staticmethod
def _make_grid(nx=20, ny=20):
yv, xv = torch.meshgrid([torch.arange(ny), torch.arange(nx)])
return torch.stack((xv, yv), 2).view((1, 1, ny, nx, 2)).float()
def check_anchor_order(m):
# Check anchor order against stride order for YOLOv5 Detect() module m, and correct if necessary
a = m.anchor_grid.prod(-1).view(-1) # anchor area
da = a[-1] - a[0] # delta a
ds = m.stride[-1] - m.stride[0] # delta s
if da.sign() != ds.sign(): # same order
print('Reversing anchor order')
m.anchors[:] = m.anchors.flip(0)
m.anchor_grid[:] = m.anchor_grid.flip(0)
def initialize_weights(model):
for m in model.modules():
t = type(m)
if t is nn.Conv2d:
pass # nn.init.kaiming_normal_(m.weight, mode='fan_out', nonlinearity='relu')
elif t is nn.BatchNorm2d:
m.eps = 1e-3
m.momentum = 0.03
elif t in [nn.Hardswish, nn.LeakyReLU, nn.ReLU, nn.ReLU6]:
# elif t in [nn.LeakyReLU, nn.ReLU, nn.ReLU6]:
m.inplace = True
class LoadImages: # for inference
def __init__(self, path, img_size=640):
p = str(Path(path)) # os-agnostic
p = os.path.abspath(p) # absolute path
if '*' in p:
files = sorted(glob.glob(p, recursive=True)) # glob
elif os.path.isdir(p):
files = sorted(glob.glob(os.path.join(p, '*.*'))) # dir
elif os.path.isfile(p):
files = [p] # files
else:
raise Exception('ERROR: %s does not exist' % p)
img_formats = ['.bmp', '.jpg', '.jpeg', '.png', '.tif', '.tiff', '.dng']
vid_formats = ['.mov', '.avi', '.mp4', '.mpg', '.mpeg', '.m4v', '.wmv', '.mkv']
images = [x for x in files if os.path.splitext(x)[-1].lower() in img_formats]
videos = [x for x in files if os.path.splitext(x)[-1].lower() in vid_formats]
ni, nv = len(images), len(videos)
self.img_size = img_size
self.files = images + videos
self.nf = ni + nv # number of files
self.video_flag = [False] * ni + [True] * nv
self.mode = 'images'
if any(videos):
self.new_video(videos[0]) # new video
else:
self.cap = None
assert self.nf > 0, 'No images or videos found in %s. Supported formats are:\nimages: %s\nvideos: %s' % \
(p, img_formats, vid_formats)
def __iter__(self):
self.count = 0
return self
def __next__(self):
if self.count == self.nf:
raise StopIteration
path = self.files[self.count]
if self.video_flag[self.count]:
# Read video
self.mode = 'video'
ret_val, img0 = self.cap.read()
if not ret_val:
self.count += 1
self.cap.release()
if self.count == self.nf: # last video
raise StopIteration
else:
path = self.files[self.count]
self.new_video(path)
ret_val, img0 = self.cap.read()
h0, w0 = img0.shape[:2]
self.frame += 1
print('\n video %g/%g (%g/%g) %s: ' % (self.count + 1, self.nf, self.frame, self.nframes, path), end='')
else:
# Read image
self.count += 1
img0 = cv2.imread(path, cv2.IMREAD_COLOR | cv2.IMREAD_IGNORE_ORIENTATION) # BGR
assert img0 is not None, 'Image Not Found ' + path
print('image %g/%g %s: \n' % (self.count, self.nf, path), end='')
h0, w0 = img0.shape[:2]
# Padded resize
img0 = cv2.resize(img0, (1280, 720), interpolation=cv2.INTER_LINEAR)
img, ratio, pad = letterbox_for_img(img0, new_shape=self.img_size, auto=True)
h, w = img.shape[:2]
shapes = (h0, w0), ((h / h0, w / w0), pad)
# Convert
# img = img[:, :, ::-1].transpose(2, 0, 1) # BGR to RGB, to 3x416x416
img = np.ascontiguousarray(img)
# cv2.imwrite(path + '.letterbox.jpg', 255 * img.transpose((1, 2, 0))[:, :, ::-1]) # save letterbox image
return path, img, img0, self.cap, shapes
def new_video(self, path):
self.frame = 0
self.cap = cv2.VideoCapture(path)
self.nframes = int(self.cap.get(cv2.CAP_PROP_FRAME_COUNT))
def __len__(self):
return self.nf # number of files
def letterbox_for_img(img, new_shape=(640, 640), color=(114, 114, 114), auto=True, scaleFill=False, scaleup=True):
# Resize image to a 32-pixel-multiple rectangle https://github.com/ultralytics/yolov3/issues/232
shape = img.shape[:2] # current shape [height, width]
if isinstance(new_shape, int):
new_shape = (new_shape, new_shape)
# Scale ratio (new / old)
r = min(new_shape[0] / shape[0], new_shape[1] / shape[1])
if not scaleup: # only scale down, do not scale up (for better test mAP)
r = min(r, 1.0)
# Compute padding
ratio = r, r # width, height ratios
new_unpad = int(round(shape[1] * r)), int(round(shape[0] * r))
dw, dh = new_shape[1] - new_unpad[0], new_shape[0] - new_unpad[1] # wh padding
if auto: # minimum rectangle
dw, dh = np.mod(dw, 32), np.mod(dh, 32) # wh padding
elif scaleFill: # stretch
dw, dh = 0.0, 0.0
new_unpad = (new_shape[1], new_shape[0])
ratio = new_shape[1] / shape[1], new_shape[0] / shape[0] # width, height ratios
dw /= 2 # divide padding into 2 sides
dh /= 2
if shape[::-1] != new_unpad: # resize
img = cv2.resize(img, new_unpad, interpolation=cv2.INTER_AREA)
top, bottom = int(round(dh - 0.1)), int(round(dh + 0.1))
left, right = int(round(dw - 0.1)), int(round(dw + 0.1))
img = cv2.copyMakeBorder(img, top, bottom, left, right, cv2.BORDER_CONSTANT, value=color) # add border
return img, ratio, (dw, dh)
# The lane line and the driving area segment branches without share information with each other and without link
YOLOP = [
[24, 33, 42], # Det_out_idx, Da_Segout_idx, LL_Segout_idx
[-1, Focus, [3, 32, 3]], # 0
[-1, Conv, [32, 64, 3, 2]], # 1
[-1, BottleneckCSP, [64, 64, 1]], # 2
[-1, Conv, [64, 128, 3, 2]], # 3
[-1, BottleneckCSP, [128, 128, 3]], # 4
[-1, Conv, [128, 256, 3, 2]], # 5
[-1, BottleneckCSP, [256, 256, 3]], # 6
[-1, Conv, [256, 512, 3, 2]], # 7
[-1, SPP, [512, 512, [5, 9, 13]]], # 8
[-1, BottleneckCSP, [512, 512, 1, False]], # 9
[-1, Conv, [512, 256, 1, 1]], # 10
[-1, Upsample, [None, 2, 'nearest']], # 11
[[-1, 6], Concat, [1]], # 12
[-1, BottleneckCSP, [512, 256, 1, False]], # 13
[-1, Conv, [256, 128, 1, 1]], # 14
[-1, Upsample, [None, 2, 'nearest']], # 15
[[-1, 4], Concat, [1]], # 16 #Encoder
[-1, BottleneckCSP, [256, 128, 1, False]], # 17
[-1, Conv, [128, 128, 3, 2]], # 18
[[-1, 14], Concat, [1]], # 19
[-1, BottleneckCSP, [256, 256, 1, False]], # 20
[-1, Conv, [256, 256, 3, 2]], # 21
[[-1, 10], Concat, [1]], # 22
[-1, BottleneckCSP, [512, 512, 1, False]], # 23
[[17, 20, 23], Detect,
[1, [[3, 9, 5, 11, 4, 20], [7, 18, 6, 39, 12, 31], [19, 50, 38, 81, 68, 157]], [128, 256, 512]]],
# Detection head 24
[16, Conv, [256, 128, 3, 1]], # 25
[-1, Upsample, [None, 2, 'nearest']], # 26
[-1, BottleneckCSP, [128, 64, 1, False]], # 27
[-1, Conv, [64, 32, 3, 1]], # 28
[-1, Upsample, [None, 2, 'nearest']], # 29
[-1, Conv, [32, 16, 3, 1]], # 30
[-1, BottleneckCSP, [16, 8, 1, False]], # 31
[-1, Upsample, [None, 2, 'nearest']], # 32
[-1, Conv, [8, 2, 3, 1]], # 33 Driving area segmentation head
[16, Conv, [256, 128, 3, 1]], # 34
[-1, Upsample, [None, 2, 'nearest']], # 35
[-1, BottleneckCSP, [128, 64, 1, False]], # 36
[-1, Conv, [64, 32, 3, 1]], # 37
[-1, Upsample, [None, 2, 'nearest']], # 38
[-1, Conv, [32, 16, 3, 1]], # 39
[-1, BottleneckCSP, [16, 8, 1, False]], # 40
[-1, Upsample, [None, 2, 'nearest']], # 41
[-1, Conv, [8, 2, 3, 1]] # 42 Lane line segmentation head
]
def get_net(cfg, **kwargs):
m_block_cfg = YOLOP
model = MCnet(m_block_cfg, **kwargs)
return model
class MCnet(nn.Module):
def __init__(self, block_cfg, **kwargs):
super(MCnet, self).__init__()
layers, save = [], []
self.nc = 1
self.detector_index = -1
self.det_out_idx = block_cfg[0][0]
self.seg_out_idx = block_cfg[0][1:]
# Build model
for i, (from_, block, args) in enumerate(block_cfg[1:]):
block = eval(block) if isinstance(block, str) else block # eval strings
if block is Detect:
self.detector_index = i
block_ = block(*args)
block_.index, block_.from_ = i, from_
layers.append(block_)
save.extend(x % i for x in ([from_] if isinstance(from_, int) else from_) if x != -1) # append to savelist
assert self.detector_index == block_cfg[0][0]
self.model, self.save = nn.Sequential(*layers), sorted(save)
self.names = [str(i) for i in range(self.nc)]
# set stride、anchor for detector
Detector = self.model[self.detector_index] # detector
if isinstance(Detector, Detect):
s = 128 # 2x min stride
# for x in self.forward(torch.zeros(1, 3, s, s)):
# print (x.shape)
with torch.no_grad():
model_out = self.forward(torch.zeros(1, 3, s, s))
detects, _, _ = model_out
Detector.stride = torch.tensor([s / x.shape[-2] for x in detects]) # forward
# print("stride"+str(Detector.stride ))
Detector.anchors /= Detector.stride.view(-1, 1, 1) # Set the anchors for the corresponding scale
check_anchor_order(Detector)
self.stride = Detector.stride
self._initialize_biases()
initialize_weights(self)
def forward(self, x):
cache = []
out = []
det_out = None
Da_fmap = []
LL_fmap = []
for i, block in enumerate(self.model):
if block.from_ != -1:
x = cache[block.from_] if isinstance(block.from_, int) else [x if j == -1 else cache[j] for j in
block.from_] # calculate concat detect
x = block(x)
if i in self.seg_out_idx: # save driving area segment result
m = nn.Sigmoid()
out.append(m(x))
if i == self.detector_index:
det_out = x
cache.append(x if block.index in self.save else None)
out.insert(0, det_out)
return out
def _initialize_biases(self, cf=None): # initialize biases into Detect(), cf is class frequency
# https://arxiv.org/abs/1708.02002 section 3.3
# cf = torch.bincount(torch.tensor(np.concatenate(dataset.labels, 0)[:, 0]).long(), minlength=nc) + 1.
# m = self.model[-1] # Detect() module
m = self.model[self.detector_index] # Detect() module
for mi, s in zip(m.m, m.stride): # from
b = mi.bias.view(m.na, -1) # conv.bias(255) to (3,85)
b.data[:, 4] += math.log(8 / (640 / s) ** 2) # obj (8 objects per 640 image)
b.data[:, 5:] += math.log(0.6 / (m.nc - 0.99)) if cf is None else torch.log(cf / cf.sum()) # cls
mi.bias = torch.nn.Parameter(b.view(-1), requires_grad=True)
def show_seg_result(img, result, index, epoch, save_dir=None, is_ll=False, palette=None, is_demo=False, is_gt=False,
color=None, alpha_da=0.5, alpha_ll=0.5):
if palette is None:
palette = np.random.randint(0, 255, size=(3, 3))
palette[0] = [0, 0, 0]
palette[1] = [0, 255, 0]
palette[2] = [255, 0, 0]
palette = np.array(palette)
assert palette.shape[0] == 3 # len(classes)
assert palette.shape[1] == 3
assert len(palette.shape) == 2
if not is_demo:
color_seg = np.zeros((result.shape[0], result.shape[1], 3), dtype=np.uint8)
for label, color in enumerate(palette):
color_seg[result == label, :] = color
color_seg = color_seg[..., ::-1]
color_mask = np.mean(color_seg, 2)
img[color_mask != 0] = img[color_mask != 0] * 0.5 + color_seg[color_mask != 0] * 0.5
else:
color_da = np.zeros((result[0].shape[0], result[0].shape[1], 3), dtype=np.uint8)
color_ll = np.zeros((result[0].shape[0], result[0].shape[1], 3), dtype=np.uint8)
if color is not None:
color_da[result[0] == 1] = color[1]
color_ll[result[1] == 1] = color[2]
else:
color_da[result[0] == 1] = [0, 255, 0]
color_ll[result[1] == 1] = [255, 0, 0]
# convert to BGR
color_da = color_da[..., ::-1]
color_ll = color_ll[..., ::-1]
color_mask_da = np.mean(color_da, 2)
color_mask_ll = np.mean(color_ll, 2)
img[color_mask_da != 0] = img[color_mask_da != 0] * (1 - alpha_da) + color_da[color_mask_da != 0] * alpha_da
img[color_mask_ll != 0] = img[color_mask_ll != 0] * (1 - alpha_ll) + color_ll[color_mask_ll != 0] * alpha_ll
# img = img * 0.5 + color_seg * 0.5
img = img.astype(np.uint8)
img = cv2.resize(img, (1280, 720), interpolation=cv2.INTER_LINEAR)
if not is_demo:
if not is_gt:
if not is_ll:
cv2.imwrite(save_dir + "/batch_{}_{}_da_segresult.png".format(epoch, index), img)
else:
cv2.imwrite(save_dir + "/batch_{}_{}_ll_segresult.png".format(epoch, index), img)
else:
if not is_ll:
cv2.imwrite(save_dir + "/batch_{}_{}_da_seg_gt.png".format(epoch, index), img)
else:
cv2.imwrite(save_dir + "/batch_{}_{}_ll_seg_gt.png".format(epoch, index), img)
return img
def scale_coords(img1_shape, coords, img0_shape, ratio_pad=None):
# Rescale coords (xyxy) from img1_shape to img0_shape
if ratio_pad is None: # calculate from img0_shape
gain = min(img1_shape[0] / img0_shape[0], img1_shape[1] / img0_shape[1]) # gain = old / new
pad = (img1_shape[1] - img0_shape[1] * gain) / 2, (img1_shape[0] - img0_shape[0] * gain) / 2 # wh padding
else:
gain = ratio_pad[0][0]
pad = ratio_pad[1]
coords[:, [0, 2]] -= pad[0] # x padding
coords[:, [1, 3]] -= pad[1] # y padding
coords[:, :4] /= gain
clip_coords(coords, img0_shape)
return coords
def clip_coords(boxes, img_shape):
# Clip bounding xyxy bounding boxes to image shape (height, width)
boxes[:, 0].clamp_(0, img_shape[1]) # x1
boxes[:, 1].clamp_(0, img_shape[0]) # y1
boxes[:, 2].clamp_(0, img_shape[1]) # x2
boxes[:, 3].clamp_(0, img_shape[0]) # y2
def plot_one_box(x, img, color=None, label=None, line_thickness=None):
# Plots one bounding box on image img
tl = line_thickness or round(0.0001 * (img.shape[0] + img.shape[1]) / 2) + 1 # line/font thickness
# color = color or [random.randint(0, 255) for _ in range(3)]
c1, c2 = (int(x[0]), int(x[1])), (int(x[2]), int(x[3]))
cv2.rectangle(img, c1, c2, color, thickness=tl, lineType=cv2.LINE_AA)
# if label:
# tf = max(tl - 1, 1) # font thickness
# t_size = cv2.getTextSize(label, 0, fontScale=tl / 3, thickness=tf)[0]
# c2 = c1[0] + t_size[0], c1[1] - t_size[1] - 3
# cv2.rectangle(img, c1, c2, color, -1, cv2.LINE_AA) # filled
# print(label)
# cv2.putText(img, label, (c1[0], c1[1] - 2), 0, tl / 3, [225, 255, 255], thickness=tf, lineType=cv2.LINE_AA)
def detect(path, task, thickness, color, alpha):
parser = argparse.ArgumentParser()
parser.add_argument('--weights', type=str, default='weights/End-to-end.pth', help='model.pth path(s)')
parser.add_argument('--source', type=str, default=path, help='file/folder ex:inference/images')
parser.add_argument('--img-size', type=int, default=640, help='inference size (pixels)')
parser.add_argument('--conf-thres', type=float, default=0.25, help='object confidence threshold')
parser.add_argument('--iou-thres', type=float, default=0.45, help='IOU threshold for NMS')
parser.add_argument('--device', default='cpu', help='cuda device, i.e. 0 or 0,1,2,3 or cpu')
parser.add_argument('--save_dir', type=str, default='runs/detect', help='directory to save results')
parser.add_argument('--original_shape', default=True, help='maintain original shape')
parser.add_argument('--show_detect_label', default=False, help='show detect labels or not')
opt = parser.parse_args()
device = select_device(device=opt.device)
half = device.type != 'cpu' # half precision only supported on CUDA
inf_time = AverageMeter()
nms_time = AverageMeter()
# Load model
model = get_net(opt)
checkpoint = torch.load(opt.weights, map_location=device)
model.load_state_dict(checkpoint['state_dict'])
model = model.to(device)
if half:
model.half() # to FP16
# Set Dataloader
dataset = LoadImages(opt.source, img_size=opt.img_size)
# Get names and colors
names = model.module.names if hasattr(model, 'module') else model.names
colors = [[random.randint(0, 255) for _ in range(3)] for _ in range(len(names))]
# Run inference
t0 = time.time()
vid_path, vid_writer = None, None
img = torch.zeros((1, 3, opt.img_size, opt.img_size), device=device) # init img
_ = model(img.half() if half else img) if device.type != 'cpu' else None # run once
model.eval()
normalize = transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
transform = transforms.Compose([
transforms.ToTensor(),
normalize
])
for i, (path, img, img_det, vid_cap, shapes) in tqdm(enumerate(dataset), total=len(dataset)):
img = transform(img).to(device)
img = img.half() if half else img.float() # uint8 to fp16/32
if img.ndimension() == 3:
img = img.unsqueeze(0)
# Inference
t1 = time_synchronized()
det_out, da_seg_out, ll_seg_out = model(img)
t2 = time_synchronized()
# if i == 0:
# print(det_out)
inf_out, _ = det_out
inf_time.update(t2 - t1, img.size(0))
# Apply NMS
t3 = time_synchronized()
det_pred = non_max_suppression(inf_out, conf_thres=opt.conf_thres, iou_thres=opt.iou_thres, classes=None,
agnostic=False)
t4 = time_synchronized()
nms_time.update(t4 - t3, img.size(0))
det = det_pred[0]
_, _, height, width = img.shape
h, w, _ = img_det.shape
pad_w, pad_h = shapes[1][1]
pad_w = int(pad_w)
pad_h = int(pad_h)
da_predict = da_seg_out[:, :, pad_h:(height - pad_h), pad_w:(width - pad_w)]
da_seg_mask = torch.nn.functional.interpolate(da_predict, scale_factor=int(1 / 0.5), mode='bilinear')
_, da_seg_mask = torch.max(da_seg_mask, 1)
da_seg_mask = da_seg_mask.int().squeeze().cpu().numpy()
# da_seg_mask = morphological_process(da_seg_mask, kernel_size=7)
ll_predict = ll_seg_out[:, :, pad_h:(height - pad_h), pad_w:(width - pad_w)]
ll_seg_mask = torch.nn.functional.interpolate(ll_predict, scale_factor=int(1 / 0.5), mode='bilinear')
_, ll_seg_mask = torch.max(ll_seg_mask, 1)
ll_seg_mask = ll_seg_mask.int().squeeze().cpu().numpy()
# Lane line post-processing
# ll_seg_mask = morphological_process(ll_seg_mask, kernel_size=7, func_type=cv2.MORPH_OPEN)
# ll_seg_mask = connect_lane(ll_seg_mask)
if 'Driving area segmentation' not in task:
da_seg_mask[:] = 0
if 'Lane detection' not in task:
ll_seg_mask[:] = 0
img_det = show_seg_result(img_det, (da_seg_mask, ll_seg_mask), _, _, is_demo=True, color=color,
alpha_da=alpha[0], alpha_ll=alpha[1])
if 'Vehicle detection' in task:
if len(det):
det[:, :4] = scale_coords(img.shape[2:], det[:, :4], img_det.shape).round()
for *xyxy, conf, cls in reversed(det):
label_det_pred = f'{names[int(cls)]} {conf:.2f}'
plot_one_box(xyxy, img_det, label=None, color=color[0][::-1], line_thickness=thickness)
if opt.original_shape:
ori_height = int(384 / shapes[1][0][0])
ori_width = int(640 / shapes[1][0][1])
img_det = cv2.resize(img_det, (ori_width, ori_height), interpolation=cv2.INTER_LINEAR)
# print('Done. (%.3fs)' % (time.time() - t0))
# print('inf : (%.4fs/frame) nms : (%.4fs/frame)' % (inf_time.avg, nms_time.avg))
return Image.fromarray(img_det[:, :, ::-1])