vfx-2 / human-segmentation /MatteAnalyzer.py
TaqiRaza512's picture
Initial commit
a103028
from math import floor
import cv2
from heartbeats import *
from python_core import *
class MatteAnalyzer:
def __init__(self, heartbeats_path, video_path, matte_path, transparent_video_path, opencv_encoding = False):
self.video_info = VideoInfo(video_path)
self.heartbeats = None
if FileExists(heartbeats_path):
self.heartbeats = Heartbeats_Init(heartbeats_path)
if not Heartbeats_IsValid(self.heartbeats):
log.fatal("Could not load heartbeat file => {:s}.".format(self.heartbeats.errorResult))
else:
self.camera_id = CameraId_InitFromFileName(video_path)
_, _, _, self.hb_frame_offset = self.__get_face_body_rects(0)
self.transparent_video = transparent_video_path is not None
self.transparent_video_encoder = None
self.video_path = video_path
self.matte_path = matte_path
if self.transparent_video:
self.transparent_video_encoder = CreateVideoWriter( video_path=transparent_video_path,
frame_rate=self.video_info.frame_rate,
width=self.video_info.width,
height=self.video_info.height,
monochrome_video=True,
encoding_bframes=0,
cleanup=True,
gpu_if_available=(opencv_encoding == False))
self.json_output = []
self.thread = None
# Maximum area covered by subject
self._bounding_rect = None
# Bounding rect at current frame
self._frame_bounding_rect = None
# ------------------------------------------------------------------------------------------------------------------
def bounding_rect(self):
return self._bounding_rect
# ------------------------------------------------------------------------------------------------------------------
def prune_contours(self, conts, width, height):
# Prune contours by their size
max_box = None
max_area = -1
for cont in conts:
box = cv2.boundingRect(cont)
x, y, w, h = box
if (w*h) > max_area:
max_area = w*h
max_box = box
pixel_rect = PixelRect(0, 0, 1, 1)
if max_box is not None:
pixel_rect = PixelRect(max_box[0], max_box[1], max_box[2], max_box[3])
return pixel_rect.to_normalized_rect(width, height)
# ------------------------------------------------------------------------------------------------------------------
# ------------------------------------------------------------------------------------------------------------------
def estimate_subject_boundary(self, gray, res_fact=2.67):
h, w = gray.shape[:2]
sw = floor(w/res_fact)
sh = floor(h/res_fact)
# Estimates subject boundary in a frame
gray = cv2.resize(gray.copy(), (sw, sh))
conts, heir = cv2.findContours(gray, cv2.RETR_LIST, cv2.CHAIN_APPROX_SIMPLE)
subject_rect = self.prune_contours(conts, sw, sh)
return subject_rect
# ------------------------------------------------------------------------------------------------------------------
# ------------------------------------------------------------------------------------------------------------------
def __get_face_body_rects(self, idx):
# Get face and body rect from heartbeats.
camera_hb = Heartbeats_GetCameraHeartbeatAtFrameidx(self.heartbeats, self.camera_id, idx)
if camera_hb.faceRectsCount == 0:
return None, None, camera_hb.time, camera_hb.frameIndex
face_rect = camera_hb.faceRects
face_rect = NormalizedRect(face_rect.x, face_rect.y, face_rect.width, face_rect.height)
body_rect = camera_hb.bodyRects
body_rect = NormalizedRect(body_rect.x, body_rect.y, body_rect.width, body_rect.height)
return face_rect, body_rect, camera_hb.time, camera_hb.frameIndex
# ------------------------------------------------------------------------------------------------------------------
# ------------------------------------------------------------------------------------------------------------------
def construct_json(self, hb_timestamp, subject_rect: NormalizedRect):
if subject_rect is None:
dumper = {
"time": hb_timestamp,
"body": []
}
else:
dumper = {
"time": hb_timestamp,
"body": [{
"y": subject_rect.min_y,
"w": subject_rect.width,
"x": subject_rect.min_x,
"h": subject_rect.height
}]
}
return dumper
# ------------------------------------------------------------------------------------------------------------------
# ------------------------------------------------------------------------------------------------------------------
def display_rects(self, frame, face_rect, body_rect, subject_rect: NormalizedRect, width=1920, height=1440):
if face_rect is not None:
face_rect = face_rect.to_pixel_rect(width, height)
cv2.rectangle(frame, (face_rect.min_x, face_rect.min_y), (face_rect.max_x, face_rect.max_y), (0, 255, 0), thickness=1)
if body_rect is not None:
body_rect = body_rect.to_pixel_rect(width, height)
cv2.rectangle(frame, (body_rect.min_x, body_rect.min_y), (body_rect.max_x, body_rect.max_y), (255, 0, 0), thickness=1)
if subject_rect is not None:
subject_rect = subject_rect.to_pixel_rect(width, height)
cv2.rectangle(frame, (subject_rect.min_x, subject_rect.min_y), (subject_rect.max_x, subject_rect.max_y), (0, 0, 255), thickness=1)
return frame
# ------------------------------------------------------------------------------------------------------------------
# ------------------------------------------------------------------------------------------------------------------
def sanity_test_hb(self, start_frame, frames_count):
heartbeats_count = 0
last_frame_idx = 0
for idx in range(start_frame, frames_count):
camera_hb = Heartbeats_GetCameraHeartbeatAtFrameidx(self.heartbeats, self.camera_id, idx)
hb_frame_idx = camera_hb.frameIndex
if last_frame_idx != hb_frame_idx:
last_frame_idx = hb_frame_idx
heartbeats_count += 1
dumper_count = len(self.json_output)
if heartbeats_count != dumper_count:
log.warning("Number of face rects are not equal to subject rects [hb: {:d}, VS cv: {:d}]".format(heartbeats_count, dumper_count))
# ------------------------------------------------------------------------------------------------------------------
def analyze(self, start_frame, frame_count):
video_reader = CreateVideoReader(self.matte_path)
video_reader.seek(start_frame)
for frame_idx in range(frame_count):
frame = video_reader.read()
# In accordance with matte analyzer
gray = cv2.cvtColor(frame.copy(), cv2.COLOR_RGB2GRAY)
self.__analyze_matte(gray, frame_idx)
if self.heartbeats is not None:
self.__analyze_matte_hb(gray, frame_idx)
# ------------------------------------------------------------------------------------------------------------------
def __analyze_matte(self, frame, frame_idx):
# frames_count = frames.shape[0]
# for i in range(frames_count):
# frame = frames[i, :, :]
self._frame_bounding_rect = self.estimate_subject_boundary(frame)
if self._bounding_rect is None:
self._bounding_rect = self._frame_bounding_rect
else:
self._bounding_rect.union(self._frame_bounding_rect)
# ------------------------------------------------------------------------------------------------------------------
def __analyze_matte_hb(self, frame, frame_idx):
# cv_frame_idx = (frame_idx - frames_count) + i + 1 # Compute frame index according to batch size and Main loop
face_rect, body_rect, hb_timestamp, hb_frame_idx = self.__get_face_body_rects(frame_idx)
# cv_frame_idx += self.hb_frame_offset # Offset timestamp from heartbeats
frame_idx_matches = frame_idx == hb_frame_idx
# If frame index matches with that of heartbeats, add subject boundary/rect to json
if frame_idx_matches:
dumper = self.construct_json(hb_timestamp, self._frame_bounding_rect)
self.json_output.append(dumper)
# Display subject boundary/rect to every frame if transparent_video is to be written
if self.transparent_video:
segmap_frame = cv2.cvtColor(frame.copy(), cv2.COLOR_GRAY2RGB)
empty_frame = self.display_rects(segmap_frame, face_rect, body_rect, self._frame_bounding_rect, width=self.video_info.width, height=self.video_info.height )
self.transparent_video_encoder.write(empty_frame)
# ------------------------------------------------------------------------------------------------------------------
# def analyze_matte_parallel(self, frames, frame_idx, frame_width, frame_height):
# if self.thread is not None:
# self.thread.join()
# self.thread = Thread(target=self.analyze_matte, args=(frames, frame_idx, frame_width, frame_height))
# self.thread.start()
# ------------------------------------------------------------------------------------------------------------------