Spaces:
Running
on
Zero
Running
on
Zero
| import math | |
| # from YoloVal import DetectionValidatorEnsemble | |
| from argparse import ArgumentParser | |
| from collections import deque | |
| import cv2 | |
| import numpy as np | |
| import torch | |
| from torch import nn | |
| from ultralytics import YOLO | |
| from ultralytics.engine.results import Results | |
| from ultralytics.models.yolo.detect import DetectionValidator | |
| from ultralytics.nn.autobackend import AutoBackend | |
| from ultralytics.utils import ops, nms | |
| def do_rectangles_overlap(rect1, rect2, overlap_threshold=0.5): | |
| # Rect1 coords | |
| x1_min, y1_min, x1_max, y1_max = rect1 | |
| # Rect2 coords | |
| x2_min, y2_min, x2_max, y2_max = rect2 | |
| # Check if one rectangle is to the left of the other | |
| if x1_max < x2_min or x2_max < x1_min: | |
| return False | |
| # Check if one rectangle is above the other | |
| if y1_max < y2_min or y2_max < y1_min: | |
| return False | |
| # Find the area of the first rectangle | |
| area_rect1 = (x1_max - x1_min) * (y1_max - y1_min) | |
| area_rect2 = (x2_max - x2_min) * (y2_max - y2_min) | |
| # Find the coordinates of the intersection rectangle | |
| inter_x_min = max(x1_min, x2_min) | |
| inter_x_max = min(x1_max, x2_max) | |
| inter_y_min = max(y1_min, y2_min) | |
| inter_y_max = min(y1_max, y2_max) | |
| # Check if there is no intersection | |
| if inter_x_max <= inter_x_min or inter_y_max <= inter_y_min: | |
| return False | |
| # Calculate the area of the intersection rectangle | |
| inter_area = (inter_x_max - inter_x_min) * (inter_y_max - inter_y_min) | |
| # Calculate the percentage of overlap relative to both rectangles | |
| overlap_percentage_1 = inter_area / area_rect1 | |
| overlap_percentage_2 = inter_area / area_rect2 | |
| # Check for complete containment | |
| contained = ((x1_min <= x2_min <= x1_max and x1_min <= x2_max <= x1_max) and | |
| (y1_min <= y2_min <= y1_max and y1_min <= y2_max <= y1_max)) or \ | |
| ((x2_min <= x1_min <= x2_max and x2_min <= x1_max <= x2_max) and | |
| (y2_min <= y1_min <= y2_max and y2_min <= y1_max <= y2_max)) | |
| # Return True if the overlap meets the threshold | |
| return overlap_percentage_1 >= overlap_threshold or overlap_percentage_2 >= overlap_threshold or contained | |
| import spaces | |
| class YoloEnsemble: | |
| def __init__(self, weights: list[str]): | |
| self.models = [YOLO(weight) for weight in weights] | |
| def predict(self, img_path: str, conf: float = 0.25, verbose: bool = True): | |
| import torch | |
| import numpy as np | |
| import random | |
| seed = 42 | |
| torch.manual_seed(seed) | |
| np.random.seed(seed) | |
| random.seed(seed) | |
| torch.cuda.manual_seed_all(seed) # if you are using multi-GPU. | |
| # For full reproducibility, you might also need this | |
| torch.backends.cudnn.deterministic = True | |
| torch.backends.cudnn.benchmark = False | |
| predictions = [_model(img_path, conf=conf, verbose=verbose) for _model in self.models] | |
| if len(self.models) > 1: | |
| return self.ensemble(predictions) | |
| return predictions[0] | |
| def ensemble(self, predictions: list): | |
| hits = None | |
| orig_shape = None | |
| names = None | |
| orig_img = None | |
| path = None | |
| speed = None | |
| for results in predictions: | |
| for result in results: | |
| _hits = result.boxes.data.unsqueeze(dim=0) | |
| if hits is None: | |
| hits = _hits | |
| else: | |
| hits = torch.cat((hits, _hits), dim=1) | |
| if orig_shape is None: | |
| orig_shape = result.orig_shape | |
| names = result.names | |
| orig_img = result.orig_img | |
| path = result.path | |
| speed = result.speed | |
| # hits = hits.unsqueeze(dim=0) | |
| nms_hits = nms.non_max_suppression(hits, conf_thres=0.25, classes=[0, 1, 2, 3, 4, 5, 6]) | |
| boxes = deque(nms_hits[0].tolist()) | |
| non_overlapping_boxes = [] | |
| while len(boxes) > 0: | |
| box = boxes.popleft() | |
| overlappers = [box] | |
| rem = [] | |
| for i, b in enumerate(boxes): | |
| if do_rectangles_overlap(box[:4], b[:4]): | |
| overlappers.append(b) | |
| rem.append(i) | |
| for _i, _ in enumerate(rem): | |
| del boxes[_ - _i] | |
| keep_box = sorted(overlappers, key=lambda x: x[4], reverse=True)[0] | |
| non_overlapping_boxes.append(keep_box) | |
| if len(non_overlapping_boxes) == 0: | |
| return [Results(names=names, orig_img=orig_img, path=path, speed=speed)] | |
| # result = Results(boxes=torch.Tensor(non_overlapping_boxes).to(nms_hits[0].get_device()), names=names, orig_img=orig_img, path=path, speed=speed) | |
| return [Results(boxes=torch.Tensor(non_overlapping_boxes), #.to(nms_hits[0].get_device()), | |
| names=names, orig_img=orig_img, path=path, speed=speed)] | |
| class YoloEnsembleAutoBackend: | |
| def __init__(self, weights: list[str], val=False, **kwargs): | |
| if isinstance(weights, list): | |
| self.models = [ | |
| # AutoBackend( | |
| # weights=weight, | |
| # device=kwargs.get('device', None), | |
| # dnn=kwargs.get('dnn', False), | |
| # data=kwargs.get('data', None), | |
| # fp16=kwargs.get('fp16', False), | |
| # ) for weight in weights | |
| YOLO(weight) for weight in weights | |
| ] | |
| else: | |
| self.models = [ | |
| AutoBackend( | |
| weights=weights, | |
| device=kwargs.get('device', None), | |
| dnn=kwargs.get('dnn', False), | |
| data=kwargs.get('data', None), | |
| fp16=kwargs.get('fp16', False), | |
| ) | |
| ] | |
| model = AutoBackend( | |
| weights=weights[0], | |
| device=kwargs.get('device', None), | |
| dnn=kwargs.get('dnn', False), | |
| data=kwargs.get('data', None), | |
| fp16=kwargs.get('fp16', False), | |
| ) | |
| # self.models[0].val() | |
| self.device = kwargs.get('device', None) | |
| self.fp16 = model.fp16 | |
| self.stride = model.stride | |
| self.pt = model.pt | |
| self.jit = model.jit | |
| self.engine = model.engine | |
| self.val = val | |
| self.names = model.names | |
| def warmup(self, imgsz=(1, 3, 640, 640)): | |
| pass | |
| def eval(self): | |
| for model in self.models: | |
| model.eval() | |
| def predict(self, imgs, conf=0.25, verbose=True): | |
| predictions = [_model(imgs, conf=conf, verbose=verbose) for _model in self.models] | |
| predictions = [list(x) for x in zip(*predictions)] | |
| if len(self.models) > 1: | |
| # return self.ensemble([torch.cat([p[0] for p in predictions], 1)]) | |
| return self.ensemble2(predictions) | |
| if not self.val: | |
| return predictions[0] | |
| return predictions[0] | |
| def ensemble(self, predictions: list): | |
| final_preds = [] | |
| device = None | |
| for ip, results in enumerate(predictions): | |
| for ir, result in enumerate(results): | |
| device = result.device | |
| _array = deque(result.cpu().tolist()) | |
| non_overlapping_boxes = [] | |
| while len(_array) > 0: | |
| box = _array.popleft() | |
| overlappers = [box] | |
| rem = [] | |
| for i, b in enumerate(_array): | |
| if do_rectangles_overlap(box[:4], b[:4]): | |
| overlappers.append(b) | |
| rem.append(i) | |
| for _i, _ in enumerate(rem): | |
| del _array[_ - _i] | |
| keep_box = sorted(overlappers, key=lambda x: x[4], reverse=True)[0] | |
| non_overlapping_boxes.append(keep_box) | |
| repeat = int(math.ceil(300 / len(non_overlapping_boxes))) | |
| non_overlapping_boxes = non_overlapping_boxes * repeat | |
| final_preds.append(non_overlapping_boxes[:300]) | |
| _new_preds = torch.tensor(final_preds, device=device) | |
| return _new_preds | |
| def ensemble2(self, predictions: list): | |
| final_preds = [] | |
| device = None | |
| preds = [] | |
| for ip, prediction in enumerate(predictions): # for image i | |
| model_preds = [] | |
| for ir, result in enumerate(prediction): # for model r's prediction on image i | |
| if not device: | |
| device = result.boxes.xyxy.device | |
| boxes = np.array(result.boxes.xyxy.cpu().tolist()) | |
| if len(boxes) == 0: | |
| continue | |
| _cls = np.array(result.boxes.cls.cpu().tolist()) | |
| _cls = _cls.reshape(-1, 1) | |
| _conf = np.array(result.boxes.conf.cpu().tolist()) | |
| _conf = _conf.reshape(-1, 1) | |
| try: | |
| np.hstack((boxes, _conf, _cls)) | |
| except: | |
| breakpoint() | |
| boxes = np.hstack((boxes, _conf, _cls)) | |
| boxes = boxes.tolist() | |
| model_preds.extend(boxes) | |
| preds.append(model_preds) | |
| for ip, pred in enumerate(preds): # for image i | |
| _array = deque(pred) | |
| non_overlapping_boxes = [] | |
| while len(_array) > 0: | |
| box = _array.popleft() | |
| overlappers = [box] | |
| rem = [] | |
| for i, b in enumerate(_array): | |
| if do_rectangles_overlap(box[:4], b[:4]): | |
| overlappers.append(b) | |
| rem.append(i) | |
| for _i, _ in enumerate(rem): | |
| del _array[_ - _i] | |
| keep_box = sorted(overlappers, key=lambda x: x[4], reverse=True)[0] | |
| non_overlapping_boxes.append(keep_box) | |
| # increase to 100 | |
| if len(non_overlapping_boxes) != 0: | |
| repeat = int(math.ceil(100 / len(non_overlapping_boxes))) | |
| non_overlapping_boxes = non_overlapping_boxes * repeat | |
| final_preds.append(non_overlapping_boxes[:100]) | |
| _new_preds = torch.tensor(final_preds, device=device) | |
| # for ip, results in enumerate(predictions): | |
| # per_img_preds = [] | |
| # for ir, result in enumerate(results): | |
| # device = result.boxes.xyxy.device | |
| # boxes = np.array(result.boxes.xyxy.cpu().tolist()) | |
| # _cls = np.array(result.boxes.cls.cpu().tolist()) | |
| # _cls = _cls.reshape(-1, 1) | |
| # _conf = np.array(result.boxes.conf.cpu().tolist()) | |
| # _conf = _conf.reshape(-1, 1) | |
| # | |
| # boxes = np.hstack((boxes, _conf, _cls)) | |
| # boxes = boxes.tolist() | |
| # | |
| # _array = deque(boxes) | |
| # non_overlapping_boxes = [] | |
| # while len(_array) > 0: | |
| # box = _array.popleft() | |
| # overlappers = [box] | |
| # rem = [] | |
| # for i, b in enumerate(_array): | |
| # if do_rectangles_overlap(box[:4], b[:4]): | |
| # overlappers.append(b) | |
| # rem.append(i) | |
| # for _i, _ in enumerate(rem): | |
| # del _array[_ - _i] | |
| # keep_box = sorted(overlappers, key=lambda x: x[4], reverse=True)[0] | |
| # non_overlapping_boxes.append(keep_box) | |
| # | |
| # # repeat = int(math.ceil(100 / len(non_overlapping_boxes))) | |
| # # non_overlapping_boxes = non_overlapping_boxes * repeat | |
| # # final_preds.append(non_overlapping_boxes[:100]) | |
| # per_img_preds.extend(non_overlapping_boxes) | |
| # | |
| # | |
| # _new_preds = torch.tensor(final_preds, device=device) | |
| return _new_preds | |
| class YoloPreprocess(nn.Module): | |
| def __init__(self): | |
| super(YoloPreprocess, self).__init__() | |
| def pre_transform(self, img: np.ndarray): | |
| img = img | |
| shape = len(img), len(img[0]) | |
| new_shape = [1280, 1280] | |
| r = min(new_shape[0] / shape[0], new_shape[1] / shape[1]) | |
| new_unpad = int(round(shape[1] * r)), int(round(shape[0] * r)) | |
| dw, dh = new_shape[1] - new_unpad[0], new_shape[0] - new_unpad[1] # wh padding | |
| dw, dh = np.mod(dw, 32), np.mod(dh, 32) | |
| dw /= 2 | |
| dh /= 2 | |
| if shape[::-1] != new_unpad: # resize | |
| img = cv2.resize(img, new_unpad, interpolation=cv2.INTER_LINEAR) | |
| top, bottom = int(round(dh - 0.1)), int(round(dh + 0.1)) | |
| left, right = int(round(dw - 0.1)), int(round(dw + 0.1)) | |
| img = cv2.copyMakeBorder( | |
| img, top, bottom, left, right, cv2.BORDER_CONSTANT, value=(114, 114, 114) | |
| ) | |
| return [img] | |
| def forward(self, im): | |
| im = np.stack(self.pre_transform(im)) | |
| im = im[..., ::-1].transpose((0, 3, 1, 2)) # BGR to RGB, BHWC to BCHW, (n, 3, h, w) | |
| im = np.ascontiguousarray(im) # contiguous | |
| # im = torch.from_numpy(im) | |
| _im = im / 255 | |
| return _im | |
| if __name__ == '__main__': | |
| parser = ArgumentParser() | |
| parser.add_argument('--weights', nargs='+', help="Model paths", required=True) | |
| args = parser.parse_args() | |
| # img = cv2.imread('askubuntu2.png') | |
| # x = YoloPreprocess() | |
| # x(img) | |
| # model = YoloEnsemble(args.weights) | |
| # model = YOLO('./train16.pt').to('cuda') | |
| # results = model.predict(['askubuntu2.png'], conf=0.7) | |
| # for result in results: | |
| # boxes = result.boxes # Boxes object for bounding box outputs | |
| # masks = result.masks # Masks object for segmentation masks outputs | |
| # keypoints = result.keypoints # Keypoints object for pose outputs | |
| # probs = result.probs # Probs object for classification outputs | |
| # # result.show() # display to screen | |
| # result.save(filename='result.jpg') | |
| args = dict(model='./train16.pt', data='dataset/data.yaml') | |
| validator = DetectionValidator(args=args) | |
| validator() | |