| | import math |
| | import os |
| | from copy import deepcopy |
| | from typing import Dict, List, Optional, Tuple |
| |
|
| | import cv2 |
| | import numpy as np |
| | import torch |
| | from mivolo.data.misc import aggregate_votes_winsorized, assign_faces, box_iou, cropout_black_parts |
| | from ultralytics.engine.results import Results |
| | from ultralytics.utils.plotting import Annotator, colors |
| |
|
| | |
| | os.unsetenv("CUBLAS_WORKSPACE_CONFIG") |
| |
|
| | AGE_GENDER_TYPE = Tuple[float, str] |
| |
|
| |
|
| | class PersonAndFaceCrops: |
| | def __init__(self): |
| | |
| | self.crops_persons: Dict[int, np.ndarray] = {} |
| |
|
| | |
| | self.crops_faces: Dict[int, np.ndarray] = {} |
| |
|
| | |
| | self.crops_faces_wo_body: Dict[int, np.ndarray] = {} |
| |
|
| | |
| | self.crops_persons_wo_face: Dict[int, np.ndarray] = {} |
| |
|
| | def _add_to_output( |
| | self, crops: Dict[int, np.ndarray], out_crops: List[np.ndarray], out_crop_inds: List[Optional[int]] |
| | ): |
| | inds_to_add = list(crops.keys()) |
| | crops_to_add = list(crops.values()) |
| | out_crops.extend(crops_to_add) |
| | out_crop_inds.extend(inds_to_add) |
| |
|
| | def _get_all_faces( |
| | self, use_persons: bool, use_faces: bool |
| | ) -> Tuple[List[Optional[int]], List[Optional[np.ndarray]]]: |
| | """ |
| | Returns |
| | if use_persons and use_faces |
| | faces: faces_with_bodies + faces_without_bodies + [None] * len(crops_persons_wo_face) |
| | if use_persons and not use_faces |
| | faces: [None] * n_persons |
| | if not use_persons and use_faces: |
| | faces: faces_with_bodies + faces_without_bodies |
| | """ |
| |
|
| | def add_none_to_output(faces_inds, faces_crops, num): |
| | faces_inds.extend([None for _ in range(num)]) |
| | faces_crops.extend([None for _ in range(num)]) |
| |
|
| | faces_inds: List[Optional[int]] = [] |
| | faces_crops: List[Optional[np.ndarray]] = [] |
| |
|
| | if not use_faces: |
| | add_none_to_output(faces_inds, faces_crops, len( |
| | self.crops_persons) + len(self.crops_persons_wo_face)) |
| | return faces_inds, faces_crops |
| |
|
| | self._add_to_output(self.crops_faces, faces_crops, faces_inds) |
| | self._add_to_output(self.crops_faces_wo_body, faces_crops, faces_inds) |
| |
|
| | if use_persons: |
| | add_none_to_output(faces_inds, faces_crops, |
| | len(self.crops_persons_wo_face)) |
| |
|
| | return faces_inds, faces_crops |
| |
|
| | def _get_all_bodies( |
| | self, use_persons: bool, use_faces: bool |
| | ) -> Tuple[List[Optional[int]], List[Optional[np.ndarray]]]: |
| | """ |
| | Returns |
| | if use_persons and use_faces |
| | persons: bodies_with_faces + [None] * len(faces_without_bodies) + bodies_without_faces |
| | if use_persons and not use_faces |
| | persons: bodies_with_faces + bodies_without_faces |
| | if not use_persons and use_faces |
| | persons: [None] * n_faces |
| | """ |
| |
|
| | def add_none_to_output(bodies_inds, bodies_crops, num): |
| | bodies_inds.extend([None for _ in range(num)]) |
| | bodies_crops.extend([None for _ in range(num)]) |
| |
|
| | bodies_inds: List[Optional[int]] = [] |
| | bodies_crops: List[Optional[np.ndarray]] = [] |
| |
|
| | if not use_persons: |
| | add_none_to_output(bodies_inds, bodies_crops, len( |
| | self.crops_faces) + len(self.crops_faces_wo_body)) |
| | return bodies_inds, bodies_crops |
| |
|
| | self._add_to_output(self.crops_persons, bodies_crops, bodies_inds) |
| | if use_faces: |
| | add_none_to_output(bodies_inds, bodies_crops, |
| | len(self.crops_faces_wo_body)) |
| |
|
| | self._add_to_output(self.crops_persons_wo_face, |
| | bodies_crops, bodies_inds) |
| |
|
| | return bodies_inds, bodies_crops |
| |
|
| | def get_faces_with_bodies(self, use_persons: bool, use_faces: bool): |
| | """ |
| | Return |
| | faces: faces_with_bodies, faces_without_bodies, [None] * len(crops_persons_wo_face) |
| | persons: bodies_with_faces, [None] * len(faces_without_bodies), bodies_without_faces |
| | """ |
| |
|
| | bodies_inds, bodies_crops = self._get_all_bodies( |
| | use_persons, use_faces) |
| | faces_inds, faces_crops = self._get_all_faces(use_persons, use_faces) |
| |
|
| | return (bodies_inds, bodies_crops), (faces_inds, faces_crops) |
| |
|
| | def save(self, out_dir="output"): |
| | ind = 0 |
| | os.makedirs(out_dir, exist_ok=True) |
| | for crops in [self.crops_persons, self.crops_faces, self.crops_faces_wo_body, self.crops_persons_wo_face]: |
| | for crop in crops.values(): |
| | if crop is None: |
| | continue |
| | out_name = os.path.join(out_dir, f"{ind}_crop.jpg") |
| | cv2.imwrite(out_name, crop) |
| | ind += 1 |
| |
|
| |
|
| | class PersonAndFaceResult: |
| | def __init__(self, results: Results): |
| |
|
| | self.yolo_results = results |
| | names = set(results.names.values()) |
| | assert "person" in names and "face" in names |
| |
|
| | |
| | self.face_to_person_map: Dict[int, Optional[int]] = { |
| | ind: None for ind in self.get_bboxes_inds("face")} |
| | self.unassigned_persons_inds: List[int] = self.get_bboxes_inds( |
| | "person") |
| | n_objects = len(self.yolo_results.boxes) |
| | self.ages: List[Optional[float]] = [None for _ in range(n_objects)] |
| | self.genders: List[Optional[str]] = [None for _ in range(n_objects)] |
| | self.gender_scores: List[Optional[float]] = [ |
| | None for _ in range(n_objects)] |
| |
|
| | @property |
| | def n_objects(self) -> int: |
| | return len(self.yolo_results.boxes) |
| |
|
| | def get_bboxes_inds(self, category: str) -> List[int]: |
| | bboxes: List[int] = [] |
| | for ind, det in enumerate(self.yolo_results.boxes): |
| | name = self.yolo_results.names[int(det.cls)] |
| | if name == category: |
| | bboxes.append(ind) |
| |
|
| | return bboxes |
| |
|
| | def get_distance_to_center(self, bbox_ind: int) -> float: |
| | """ |
| | Calculate euclidian distance between bbox center and image center. |
| | """ |
| | im_h, im_w = self.yolo_results[bbox_ind].orig_shape |
| | x1, y1, x2, y2 = self.get_bbox_by_ind(bbox_ind).cpu().numpy() |
| | center_x, center_y = (x1 + x2) / 2, (y1 + y2) / 2 |
| | dist = math.dist([center_x, center_y], [im_w / 2, im_h / 2]) |
| | return dist |
| |
|
| | def plot( |
| | self, |
| | conf=False, |
| | line_width=None, |
| | font_size=None, |
| | font="Arial.ttf", |
| | pil=False, |
| | img=None, |
| | labels=True, |
| | boxes=True, |
| | probs=True, |
| | ages=True, |
| | genders=True, |
| | gender_probs=False, |
| | ): |
| | """ |
| | Plots the detection results on an input RGB image. Accepts a numpy array (cv2) or a PIL Image. |
| | Args: |
| | conf (bool): Whether to plot the detection confidence score. |
| | line_width (float, optional): The line width of the bounding boxes. If None, it is scaled to the image size. |
| | font_size (float, optional): The font size of the text. If None, it is scaled to the image size. |
| | font (str): The font to use for the text. |
| | pil (bool): Whether to return the image as a PIL Image. |
| | img (numpy.ndarray): Plot to another image. if not, plot to original image. |
| | labels (bool): Whether to plot the label of bounding boxes. |
| | boxes (bool): Whether to plot the bounding boxes. |
| | probs (bool): Whether to plot classification probability |
| | ages (bool): Whether to plot the age of bounding boxes. |
| | genders (bool): Whether to plot the genders of bounding boxes. |
| | gender_probs (bool): Whether to plot gender classification probability |
| | Returns: |
| | (numpy.ndarray): A numpy array of the annotated image. |
| | """ |
| |
|
| | |
| | colors_by_ind = {} |
| | for face_ind, person_ind in self.face_to_person_map.items(): |
| | if person_ind is not None: |
| | colors_by_ind[face_ind] = face_ind + 2 |
| | colors_by_ind[person_ind] = face_ind + 2 |
| | else: |
| | colors_by_ind[face_ind] = 0 |
| | for person_ind in self.unassigned_persons_inds: |
| | colors_by_ind[person_ind] = 1 |
| |
|
| | names = self.yolo_results.names |
| | annotator = Annotator( |
| | deepcopy(self.yolo_results.orig_img if img is None else img), |
| | line_width, |
| | font_size, |
| | font, |
| | pil, |
| | example=names, |
| | ) |
| | pred_boxes, show_boxes = self.yolo_results.boxes, boxes |
| | pred_probs, show_probs = self.yolo_results.probs, probs |
| |
|
| | if pred_boxes and show_boxes: |
| | for bb_ind, (d, age, gender, gender_score) in enumerate( |
| | zip(pred_boxes, self.ages, self.genders, self.gender_scores) |
| | ): |
| | c, conf, guid = int(d.cls), float( |
| | d.conf) if conf else None, None if d.id is None else int(d.id.item()) |
| | name = ("" if guid is None else f"id:{guid} ") + names[c] |
| | label = ( |
| | f"{name} {conf:.2f}" if conf else name) if labels else None |
| | if ages and age is not None: |
| | label += f" {age:.1f}" |
| | if genders and gender is not None: |
| | label += f" {'F' if gender == 'female' else 'M'}" |
| | if gender_probs and gender_score is not None: |
| | label += f" ({gender_score:.1f})" |
| | annotator.box_label(d.xyxy.squeeze(), label, |
| | color=colors(colors_by_ind[bb_ind], True)) |
| |
|
| | if pred_probs is not None and show_probs: |
| | text = f"{', '.join(f'{names[j] if names else j} {pred_probs.data[j]:.2f}' for j in pred_probs.top5)}, " |
| | annotator.text((32, 32), text, txt_color=( |
| | 255, 255, 255)) |
| |
|
| | return annotator.result() |
| |
|
| | def set_tracked_age_gender(self, tracked_objects: Dict[int, List[AGE_GENDER_TYPE]]): |
| | """ |
| | Update age and gender for objects based on history from tracked_objects. |
| | Args: |
| | tracked_objects (dict[int, list[AGE_GENDER_TYPE]]): info about tracked objects by guid |
| | """ |
| |
|
| | for face_ind, person_ind in self.face_to_person_map.items(): |
| | pguid = self._get_id_by_ind(person_ind) |
| | fguid = self._get_id_by_ind(face_ind) |
| |
|
| | if fguid == -1 and pguid == -1: |
| | |
| | |
| | continue |
| | age, gender = self._gather_tracking_result( |
| | tracked_objects, fguid, pguid) |
| | if age is None or gender is None: |
| | continue |
| | self.set_age(face_ind, age) |
| | self.set_gender(face_ind, gender, 1.0) |
| | if pguid != -1: |
| | self.set_gender(person_ind, gender, 1.0) |
| | self.set_age(person_ind, age) |
| |
|
| | for person_ind in self.unassigned_persons_inds: |
| | pid = self._get_id_by_ind(person_ind) |
| | if pid == -1: |
| | continue |
| | age, gender = self._gather_tracking_result( |
| | tracked_objects, -1, pid) |
| | if age is None or gender is None: |
| | continue |
| | self.set_gender(person_ind, gender, 1.0) |
| | self.set_age(person_ind, age) |
| |
|
| | def _get_id_by_ind(self, ind: Optional[int] = None) -> int: |
| | if ind is None: |
| | return -1 |
| | obj_id = self.yolo_results.boxes[ind].id |
| | if obj_id is None: |
| | return -1 |
| | return obj_id.item() |
| |
|
| | def get_bbox_by_ind(self, ind: int, im_h: int = None, im_w: int = None) -> torch.tensor: |
| | bb = self.yolo_results.boxes[ind].xyxy.squeeze().type(torch.int32) |
| | if im_h is not None and im_w is not None: |
| | bb[0] = torch.clamp(bb[0], min=0, max=im_w - 1) |
| | bb[1] = torch.clamp(bb[1], min=0, max=im_h - 1) |
| | bb[2] = torch.clamp(bb[2], min=0, max=im_w - 1) |
| | bb[3] = torch.clamp(bb[3], min=0, max=im_h - 1) |
| | return bb |
| |
|
| | def set_age(self, ind: Optional[int], age: float): |
| | if ind is not None: |
| | self.ages[ind] = age |
| |
|
| | def set_gender(self, ind: Optional[int], gender: str, gender_score: float): |
| | if ind is not None: |
| | self.genders[ind] = gender |
| | self.gender_scores[ind] = gender_score |
| |
|
| | @staticmethod |
| | def _gather_tracking_result( |
| | tracked_objects: Dict[int, List[AGE_GENDER_TYPE]], |
| | fguid: int = -1, |
| | pguid: int = -1, |
| | minimum_sample_size: int = 10, |
| | ) -> AGE_GENDER_TYPE: |
| |
|
| | assert fguid != -1 or pguid != -1, "Incorrect tracking behaviour" |
| |
|
| | face_ages = [r[0] for r in tracked_objects[fguid] if r[0] |
| | is not None] if fguid in tracked_objects else [] |
| | face_genders = [r[1] for r in tracked_objects[fguid] |
| | if r[1] is not None] if fguid in tracked_objects else [] |
| | person_ages = [r[0] for r in tracked_objects[pguid] |
| | if r[0] is not None] if pguid in tracked_objects else [] |
| | person_genders = [r[1] for r in tracked_objects[pguid] |
| | if r[1] is not None] if pguid in tracked_objects else [] |
| |
|
| | if not face_ages and not person_ages: |
| | return None, None |
| |
|
| | |
| | |
| | |
| |
|
| | if len(person_ages + face_ages) >= minimum_sample_size: |
| | age = aggregate_votes_winsorized(person_ages + face_ages) |
| | else: |
| | face_age = np.mean(face_ages) if face_ages else None |
| | person_age = np.mean(person_ages) if person_ages else None |
| | if face_age is None: |
| | face_age = person_age |
| | if person_age is None: |
| | person_age = face_age |
| | age = (face_age + person_age) / 2.0 |
| |
|
| | genders = face_genders + person_genders |
| | assert len(genders) > 0 |
| | |
| | gender = max(set(genders), key=genders.count) |
| |
|
| | return age, gender |
| |
|
| | def get_results_for_tracking(self) -> Tuple[Dict[int, AGE_GENDER_TYPE], Dict[int, AGE_GENDER_TYPE]]: |
| | """ |
| | Get objects from current frame |
| | """ |
| | persons: Dict[int, AGE_GENDER_TYPE] = {} |
| | faces: Dict[int, AGE_GENDER_TYPE] = {} |
| |
|
| | names = self.yolo_results.names |
| | pred_boxes = self.yolo_results.boxes |
| | for _, (det, age, gender, _) in enumerate(zip(pred_boxes, self.ages, self.genders, self.gender_scores)): |
| | if det.id is None: |
| | continue |
| | cat_id, _, guid = int(det.cls), float(det.conf), int(det.id.item()) |
| | name = names[cat_id] |
| | if name == "person": |
| | persons[guid] = (age, gender) |
| | elif name == "face": |
| | faces[guid] = (age, gender) |
| |
|
| | return persons, faces |
| |
|
| | def associate_faces_with_persons(self): |
| | face_bboxes_inds: List[int] = self.get_bboxes_inds("face") |
| | person_bboxes_inds: List[int] = self.get_bboxes_inds("person") |
| |
|
| | face_bboxes: List[torch.tensor] = [ |
| | self.get_bbox_by_ind(ind) for ind in face_bboxes_inds] |
| | person_bboxes: List[torch.tensor] = [ |
| | self.get_bbox_by_ind(ind) for ind in person_bboxes_inds] |
| |
|
| | self.face_to_person_map = {ind: None for ind in face_bboxes_inds} |
| | assigned_faces, unassigned_persons_inds = assign_faces( |
| | person_bboxes, face_bboxes) |
| |
|
| | for face_ind, person_ind in enumerate(assigned_faces): |
| | face_ind = face_bboxes_inds[face_ind] |
| | person_ind = person_bboxes_inds[person_ind] if person_ind is not None else None |
| | self.face_to_person_map[face_ind] = person_ind |
| |
|
| | self.unassigned_persons_inds = [ |
| | person_bboxes_inds[person_ind] for person_ind in unassigned_persons_inds] |
| |
|
| | def crop_object( |
| | self, full_image: np.ndarray, ind: int, cut_other_classes: Optional[List[str]] = None |
| | ) -> Optional[np.ndarray]: |
| |
|
| | IOU_THRESH = 0.000001 |
| | MIN_PERSON_CROP_AFTERCUT_RATIO = 0.4 |
| | CROP_ROUND_RATE = 0.3 |
| | MIN_PERSON_SIZE = 50 |
| |
|
| | obj_bbox = self.get_bbox_by_ind(ind, *full_image.shape[:2]) |
| | x1, y1, x2, y2 = obj_bbox |
| | cur_cat = self.yolo_results.names[int( |
| | self.yolo_results.boxes[ind].cls)] |
| | |
| | obj_image = full_image[y1:y2, x1:x2].copy() |
| | crop_h, crop_w = obj_image.shape[:2] |
| |
|
| | if cur_cat == "person" and (crop_h < MIN_PERSON_SIZE or crop_w < MIN_PERSON_SIZE): |
| | return None |
| |
|
| | if not cut_other_classes: |
| | return obj_image |
| |
|
| | |
| | other_bboxes: List[torch.tensor] = [ |
| | self.get_bbox_by_ind(other_ind, *full_image.shape[:2]) for other_ind in range(len(self.yolo_results.boxes)) |
| | ] |
| |
|
| | iou_matrix = box_iou(torch.stack([obj_bbox]), torch.stack( |
| | other_bboxes)).cpu().numpy()[0] |
| |
|
| | |
| | for other_ind, (det, iou) in enumerate(zip(self.yolo_results.boxes, iou_matrix)): |
| | other_cat = self.yolo_results.names[int(det.cls)] |
| | if ind == other_ind or iou < IOU_THRESH or other_cat not in cut_other_classes: |
| | continue |
| | o_x1, o_y1, o_x2, o_y2 = det.xyxy.squeeze().type(torch.int32) |
| |
|
| | |
| | o_x1 = max(o_x1 - x1, 0) |
| | o_y1 = max(o_y1 - y1, 0) |
| | o_x2 = min(o_x2 - x1, crop_w) |
| | o_y2 = min(o_y2 - y1, crop_h) |
| |
|
| | if other_cat != "face": |
| | if (o_y1 / crop_h) < CROP_ROUND_RATE: |
| | o_y1 = 0 |
| | if ((crop_h - o_y2) / crop_h) < CROP_ROUND_RATE: |
| | o_y2 = crop_h |
| | if (o_x1 / crop_w) < CROP_ROUND_RATE: |
| | o_x1 = 0 |
| | if ((crop_w - o_x2) / crop_w) < CROP_ROUND_RATE: |
| | o_x2 = crop_w |
| |
|
| | obj_image[o_y1:o_y2, o_x1:o_x2] = 0 |
| |
|
| | obj_image, remain_ratio = cropout_black_parts( |
| | obj_image, CROP_ROUND_RATE) |
| | if remain_ratio < MIN_PERSON_CROP_AFTERCUT_RATIO: |
| | return None |
| |
|
| | return obj_image |
| |
|
| | def collect_crops(self, image) -> PersonAndFaceCrops: |
| |
|
| | crops_data = PersonAndFaceCrops() |
| | for face_ind, person_ind in self.face_to_person_map.items(): |
| | face_image = self.crop_object( |
| | image, face_ind, cut_other_classes=[]) |
| |
|
| | if person_ind is None: |
| | crops_data.crops_faces_wo_body[face_ind] = face_image |
| | continue |
| |
|
| | person_image = self.crop_object( |
| | image, person_ind, cut_other_classes=["face", "person"]) |
| |
|
| | crops_data.crops_faces[face_ind] = face_image |
| | crops_data.crops_persons[person_ind] = person_image |
| |
|
| | for person_ind in self.unassigned_persons_inds: |
| | person_image = self.crop_object( |
| | image, person_ind, cut_other_classes=["face", "person"]) |
| | crops_data.crops_persons_wo_face[person_ind] = person_image |
| |
|
| | |
| | |
| | return crops_data |
| |
|