|
|
from math import floor |
|
|
import cv2 |
|
|
|
|
|
from heartbeats import * |
|
|
from python_core import * |
|
|
|
|
|
|
|
|
class MatteAnalyzer: |
|
|
def __init__(self, heartbeats_path, video_path, matte_path, transparent_video_path, opencv_encoding = False): |
|
|
|
|
|
self.video_info = VideoInfo(video_path) |
|
|
|
|
|
self.heartbeats = None |
|
|
if FileExists(heartbeats_path): |
|
|
self.heartbeats = Heartbeats_Init(heartbeats_path) |
|
|
|
|
|
if not Heartbeats_IsValid(self.heartbeats): |
|
|
log.fatal("Could not load heartbeat file => {:s}.".format(self.heartbeats.errorResult)) |
|
|
else: |
|
|
self.camera_id = CameraId_InitFromFileName(video_path) |
|
|
|
|
|
_, _, _, self.hb_frame_offset = self.__get_face_body_rects(0) |
|
|
|
|
|
self.transparent_video = transparent_video_path is not None |
|
|
self.transparent_video_encoder = None |
|
|
self.video_path = video_path |
|
|
self.matte_path = matte_path |
|
|
|
|
|
if self.transparent_video: |
|
|
self.transparent_video_encoder = CreateVideoWriter( video_path=transparent_video_path, |
|
|
frame_rate=self.video_info.frame_rate, |
|
|
width=self.video_info.width, |
|
|
height=self.video_info.height, |
|
|
monochrome_video=True, |
|
|
encoding_bframes=0, |
|
|
cleanup=True, |
|
|
gpu_if_available=(opencv_encoding == False)) |
|
|
|
|
|
self.json_output = [] |
|
|
|
|
|
self.thread = None |
|
|
|
|
|
|
|
|
self._bounding_rect = None |
|
|
|
|
|
self._frame_bounding_rect = None |
|
|
|
|
|
|
|
|
def bounding_rect(self): |
|
|
return self._bounding_rect |
|
|
|
|
|
|
|
|
def prune_contours(self, conts, width, height): |
|
|
|
|
|
max_box = None |
|
|
max_area = -1 |
|
|
|
|
|
for cont in conts: |
|
|
box = cv2.boundingRect(cont) |
|
|
x, y, w, h = box |
|
|
|
|
|
if (w*h) > max_area: |
|
|
max_area = w*h |
|
|
max_box = box |
|
|
|
|
|
pixel_rect = PixelRect(0, 0, 1, 1) |
|
|
if max_box is not None: |
|
|
pixel_rect = PixelRect(max_box[0], max_box[1], max_box[2], max_box[3]) |
|
|
|
|
|
return pixel_rect.to_normalized_rect(width, height) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def estimate_subject_boundary(self, gray, res_fact=2.67): |
|
|
h, w = gray.shape[:2] |
|
|
sw = floor(w/res_fact) |
|
|
sh = floor(h/res_fact) |
|
|
|
|
|
gray = cv2.resize(gray.copy(), (sw, sh)) |
|
|
|
|
|
conts, heir = cv2.findContours(gray, cv2.RETR_LIST, cv2.CHAIN_APPROX_SIMPLE) |
|
|
|
|
|
subject_rect = self.prune_contours(conts, sw, sh) |
|
|
|
|
|
return subject_rect |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def __get_face_body_rects(self, idx): |
|
|
|
|
|
camera_hb = Heartbeats_GetCameraHeartbeatAtFrameidx(self.heartbeats, self.camera_id, idx) |
|
|
|
|
|
if camera_hb.faceRectsCount == 0: |
|
|
return None, None, camera_hb.time, camera_hb.frameIndex |
|
|
|
|
|
face_rect = camera_hb.faceRects |
|
|
face_rect = NormalizedRect(face_rect.x, face_rect.y, face_rect.width, face_rect.height) |
|
|
|
|
|
body_rect = camera_hb.bodyRects |
|
|
|
|
|
body_rect = NormalizedRect(body_rect.x, body_rect.y, body_rect.width, body_rect.height) |
|
|
|
|
|
return face_rect, body_rect, camera_hb.time, camera_hb.frameIndex |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def construct_json(self, hb_timestamp, subject_rect: NormalizedRect): |
|
|
if subject_rect is None: |
|
|
dumper = { |
|
|
"time": hb_timestamp, |
|
|
"body": [] |
|
|
} |
|
|
else: |
|
|
dumper = { |
|
|
"time": hb_timestamp, |
|
|
"body": [{ |
|
|
"y": subject_rect.min_y, |
|
|
"w": subject_rect.width, |
|
|
"x": subject_rect.min_x, |
|
|
"h": subject_rect.height |
|
|
}] |
|
|
} |
|
|
|
|
|
return dumper |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def display_rects(self, frame, face_rect, body_rect, subject_rect: NormalizedRect, width=1920, height=1440): |
|
|
|
|
|
if face_rect is not None: |
|
|
face_rect = face_rect.to_pixel_rect(width, height) |
|
|
cv2.rectangle(frame, (face_rect.min_x, face_rect.min_y), (face_rect.max_x, face_rect.max_y), (0, 255, 0), thickness=1) |
|
|
|
|
|
if body_rect is not None: |
|
|
body_rect = body_rect.to_pixel_rect(width, height) |
|
|
cv2.rectangle(frame, (body_rect.min_x, body_rect.min_y), (body_rect.max_x, body_rect.max_y), (255, 0, 0), thickness=1) |
|
|
|
|
|
if subject_rect is not None: |
|
|
subject_rect = subject_rect.to_pixel_rect(width, height) |
|
|
cv2.rectangle(frame, (subject_rect.min_x, subject_rect.min_y), (subject_rect.max_x, subject_rect.max_y), (0, 0, 255), thickness=1) |
|
|
|
|
|
return frame |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def sanity_test_hb(self, start_frame, frames_count): |
|
|
|
|
|
heartbeats_count = 0 |
|
|
last_frame_idx = 0 |
|
|
for idx in range(start_frame, frames_count): |
|
|
camera_hb = Heartbeats_GetCameraHeartbeatAtFrameidx(self.heartbeats, self.camera_id, idx) |
|
|
hb_frame_idx = camera_hb.frameIndex |
|
|
|
|
|
if last_frame_idx != hb_frame_idx: |
|
|
last_frame_idx = hb_frame_idx |
|
|
heartbeats_count += 1 |
|
|
|
|
|
dumper_count = len(self.json_output) |
|
|
|
|
|
if heartbeats_count != dumper_count: |
|
|
log.warning("Number of face rects are not equal to subject rects [hb: {:d}, VS cv: {:d}]".format(heartbeats_count, dumper_count)) |
|
|
|
|
|
|
|
|
|
|
|
def analyze(self, start_frame, frame_count): |
|
|
video_reader = CreateVideoReader(self.matte_path) |
|
|
|
|
|
video_reader.seek(start_frame) |
|
|
for frame_idx in range(frame_count): |
|
|
frame = video_reader.read() |
|
|
|
|
|
|
|
|
gray = cv2.cvtColor(frame.copy(), cv2.COLOR_RGB2GRAY) |
|
|
|
|
|
self.__analyze_matte(gray, frame_idx) |
|
|
|
|
|
if self.heartbeats is not None: |
|
|
self.__analyze_matte_hb(gray, frame_idx) |
|
|
|
|
|
|
|
|
def __analyze_matte(self, frame, frame_idx): |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
self._frame_bounding_rect = self.estimate_subject_boundary(frame) |
|
|
|
|
|
if self._bounding_rect is None: |
|
|
self._bounding_rect = self._frame_bounding_rect |
|
|
else: |
|
|
self._bounding_rect.union(self._frame_bounding_rect) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def __analyze_matte_hb(self, frame, frame_idx): |
|
|
|
|
|
|
|
|
|
|
|
face_rect, body_rect, hb_timestamp, hb_frame_idx = self.__get_face_body_rects(frame_idx) |
|
|
|
|
|
|
|
|
|
|
|
frame_idx_matches = frame_idx == hb_frame_idx |
|
|
|
|
|
|
|
|
if frame_idx_matches: |
|
|
dumper = self.construct_json(hb_timestamp, self._frame_bounding_rect) |
|
|
self.json_output.append(dumper) |
|
|
|
|
|
|
|
|
if self.transparent_video: |
|
|
|
|
|
segmap_frame = cv2.cvtColor(frame.copy(), cv2.COLOR_GRAY2RGB) |
|
|
empty_frame = self.display_rects(segmap_frame, face_rect, body_rect, self._frame_bounding_rect, width=self.video_info.width, height=self.video_info.height ) |
|
|
|
|
|
self.transparent_video_encoder.write(empty_frame) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|