from math import floor import cv2 from heartbeats import * from python_core import * class MatteAnalyzer: def __init__(self, heartbeats_path, video_path, matte_path, transparent_video_path, opencv_encoding = False): self.video_info = VideoInfo(video_path) self.heartbeats = None if FileExists(heartbeats_path): self.heartbeats = Heartbeats_Init(heartbeats_path) if not Heartbeats_IsValid(self.heartbeats): log.fatal("Could not load heartbeat file => {:s}.".format(self.heartbeats.errorResult)) else: self.camera_id = CameraId_InitFromFileName(video_path) _, _, _, self.hb_frame_offset = self.__get_face_body_rects(0) self.transparent_video = transparent_video_path is not None self.transparent_video_encoder = None self.video_path = video_path self.matte_path = matte_path if self.transparent_video: self.transparent_video_encoder = CreateVideoWriter( video_path=transparent_video_path, frame_rate=self.video_info.frame_rate, width=self.video_info.width, height=self.video_info.height, monochrome_video=True, encoding_bframes=0, cleanup=True, gpu_if_available=(opencv_encoding == False)) self.json_output = [] self.thread = None # Maximum area covered by subject self._bounding_rect = None # Bounding rect at current frame self._frame_bounding_rect = None # ------------------------------------------------------------------------------------------------------------------ def bounding_rect(self): return self._bounding_rect # ------------------------------------------------------------------------------------------------------------------ def prune_contours(self, conts, width, height): # Prune contours by their size max_box = None max_area = -1 for cont in conts: box = cv2.boundingRect(cont) x, y, w, h = box if (w*h) > max_area: max_area = w*h max_box = box pixel_rect = PixelRect(0, 0, 1, 1) if max_box is not None: pixel_rect = PixelRect(max_box[0], max_box[1], max_box[2], max_box[3]) return pixel_rect.to_normalized_rect(width, height) # ------------------------------------------------------------------------------------------------------------------ # ------------------------------------------------------------------------------------------------------------------ def estimate_subject_boundary(self, gray, res_fact=2.67): h, w = gray.shape[:2] sw = floor(w/res_fact) sh = floor(h/res_fact) # Estimates subject boundary in a frame gray = cv2.resize(gray.copy(), (sw, sh)) conts, heir = cv2.findContours(gray, cv2.RETR_LIST, cv2.CHAIN_APPROX_SIMPLE) subject_rect = self.prune_contours(conts, sw, sh) return subject_rect # ------------------------------------------------------------------------------------------------------------------ # ------------------------------------------------------------------------------------------------------------------ def __get_face_body_rects(self, idx): # Get face and body rect from heartbeats. camera_hb = Heartbeats_GetCameraHeartbeatAtFrameidx(self.heartbeats, self.camera_id, idx) if camera_hb.faceRectsCount == 0: return None, None, camera_hb.time, camera_hb.frameIndex face_rect = camera_hb.faceRects face_rect = NormalizedRect(face_rect.x, face_rect.y, face_rect.width, face_rect.height) body_rect = camera_hb.bodyRects body_rect = NormalizedRect(body_rect.x, body_rect.y, body_rect.width, body_rect.height) return face_rect, body_rect, camera_hb.time, camera_hb.frameIndex # ------------------------------------------------------------------------------------------------------------------ # ------------------------------------------------------------------------------------------------------------------ def construct_json(self, hb_timestamp, subject_rect: NormalizedRect): if subject_rect is None: dumper = { "time": hb_timestamp, "body": [] } else: dumper = { "time": hb_timestamp, "body": [{ "y": subject_rect.min_y, "w": subject_rect.width, "x": subject_rect.min_x, "h": subject_rect.height }] } return dumper # ------------------------------------------------------------------------------------------------------------------ # ------------------------------------------------------------------------------------------------------------------ def display_rects(self, frame, face_rect, body_rect, subject_rect: NormalizedRect, width=1920, height=1440): if face_rect is not None: face_rect = face_rect.to_pixel_rect(width, height) cv2.rectangle(frame, (face_rect.min_x, face_rect.min_y), (face_rect.max_x, face_rect.max_y), (0, 255, 0), thickness=1) if body_rect is not None: body_rect = body_rect.to_pixel_rect(width, height) cv2.rectangle(frame, (body_rect.min_x, body_rect.min_y), (body_rect.max_x, body_rect.max_y), (255, 0, 0), thickness=1) if subject_rect is not None: subject_rect = subject_rect.to_pixel_rect(width, height) cv2.rectangle(frame, (subject_rect.min_x, subject_rect.min_y), (subject_rect.max_x, subject_rect.max_y), (0, 0, 255), thickness=1) return frame # ------------------------------------------------------------------------------------------------------------------ # ------------------------------------------------------------------------------------------------------------------ def sanity_test_hb(self, start_frame, frames_count): heartbeats_count = 0 last_frame_idx = 0 for idx in range(start_frame, frames_count): camera_hb = Heartbeats_GetCameraHeartbeatAtFrameidx(self.heartbeats, self.camera_id, idx) hb_frame_idx = camera_hb.frameIndex if last_frame_idx != hb_frame_idx: last_frame_idx = hb_frame_idx heartbeats_count += 1 dumper_count = len(self.json_output) if heartbeats_count != dumper_count: log.warning("Number of face rects are not equal to subject rects [hb: {:d}, VS cv: {:d}]".format(heartbeats_count, dumper_count)) # ------------------------------------------------------------------------------------------------------------------ def analyze(self, start_frame, frame_count): video_reader = CreateVideoReader(self.matte_path) video_reader.seek(start_frame) for frame_idx in range(frame_count): frame = video_reader.read() # In accordance with matte analyzer gray = cv2.cvtColor(frame.copy(), cv2.COLOR_RGB2GRAY) self.__analyze_matte(gray, frame_idx) if self.heartbeats is not None: self.__analyze_matte_hb(gray, frame_idx) # ------------------------------------------------------------------------------------------------------------------ def __analyze_matte(self, frame, frame_idx): # frames_count = frames.shape[0] # for i in range(frames_count): # frame = frames[i, :, :] self._frame_bounding_rect = self.estimate_subject_boundary(frame) if self._bounding_rect is None: self._bounding_rect = self._frame_bounding_rect else: self._bounding_rect.union(self._frame_bounding_rect) # ------------------------------------------------------------------------------------------------------------------ def __analyze_matte_hb(self, frame, frame_idx): # cv_frame_idx = (frame_idx - frames_count) + i + 1 # Compute frame index according to batch size and Main loop face_rect, body_rect, hb_timestamp, hb_frame_idx = self.__get_face_body_rects(frame_idx) # cv_frame_idx += self.hb_frame_offset # Offset timestamp from heartbeats frame_idx_matches = frame_idx == hb_frame_idx # If frame index matches with that of heartbeats, add subject boundary/rect to json if frame_idx_matches: dumper = self.construct_json(hb_timestamp, self._frame_bounding_rect) self.json_output.append(dumper) # Display subject boundary/rect to every frame if transparent_video is to be written if self.transparent_video: segmap_frame = cv2.cvtColor(frame.copy(), cv2.COLOR_GRAY2RGB) empty_frame = self.display_rects(segmap_frame, face_rect, body_rect, self._frame_bounding_rect, width=self.video_info.width, height=self.video_info.height ) self.transparent_video_encoder.write(empty_frame) # ------------------------------------------------------------------------------------------------------------------ # def analyze_matte_parallel(self, frames, frame_idx, frame_width, frame_height): # if self.thread is not None: # self.thread.join() # self.thread = Thread(target=self.analyze_matte, args=(frames, frame_idx, frame_width, frame_height)) # self.thread.start() # ------------------------------------------------------------------------------------------------------------------