xuan3986's picture
Upload 111 files
03022ee verified
"""
This script uses pretrained models to perform speaker visual embeddings extracting.
This script use following open source models:
1. Face detection: https://github.com/Linzaer/Ultra-Light-Fast-Generic-Face-Detector-1MB
2. Active speaker detection: TalkNet, https://github.com/TaoRuijie/TalkNet-ASD
3. Face quality assessment: https://modelscope.cn/models/iic/cv_manual_face-quality-assessment_fqa
4. Face recognition: https://modelscope.cn/models/iic/cv_ir101_facerecognition_cfglint
5. Lip detection: https://huggingface.co/pyannote/segmentation-3.0
Processing pipeline:
1. Face detection (input: video frames)
2. Active speaker detection (input: consecutive face frames, audio)
3. Face quality assessment (input: video frames)
4. Face recognition (input: video frames)
5. Lip detection (input: video frames)
"""
import numpy as np
from scipy.io import wavfile
from scipy.interpolate import interp1d
import time, torch, cv2, pickle, gc, python_speech_features
from scipy import signal
class VisionProcesser():
def __init__(
self,
video_file_path,
audio_file_path,
audio_vad,
out_feat_path,
visual_models,
conf=None,
out_video_path=None
):
# read audio data and check the samplerate.
fs, audio = wavfile.read(audio_file_path)
if len(audio.shape) > 1:
audio = audio.mean(axis=1)
duration = audio.shape[0] / fs
target_length = int(duration * 16000)
self.audio = signal.resample(audio, target_length)
# convert time interval to integer sampling point interval.
audio_vad = [[int(i*16000), int(j*16000)] for (i, j) in audio_vad]
self.video_path = video_file_path
# read video data
self.cap = cv2.VideoCapture(video_file_path)
w = self.cap.get(cv2.CAP_PROP_FRAME_WIDTH)
h = self.cap.get(cv2.CAP_PROP_FRAME_HEIGHT)
self.count = int(self.cap.get(cv2.CAP_PROP_FRAME_COUNT))
self.fps = self.cap.get(cv2.CAP_PROP_FPS)
print('video %s info: w: {}, h: {}, count: {}, fps: {}'.format(w, h, self.count, self.fps) % self.video_path)
# initial vision models
self.visual_models = visual_models
# store facial feats along with the necessary information.
self.active_facial_embs = {
'frameI':np.empty((0,), dtype=np.int32),
'feat':np.empty((0, 512), dtype=np.float32),
'faceI': np.empty((0,), dtype=np.int32),
'face': [],
'face_bbox': np.empty((0, 4), dtype=np.int32),
'lip': [],
'lip_bbox': np.empty((0, 4), dtype=np.int32),
}
self.audio_vad = audio_vad
self.out_video_path = out_video_path
self.out_feat_path = out_feat_path
self.min_track = conf['min_track']
self.num_failed_det = conf['num_failed_det']
self.crop_scale = conf['crop_scale']
self.min_face_size = conf['min_face_size']
self.face_det_stride = conf['face_det_stride']
self.shot_stride = conf['shot_stride']
if self.out_video_path is not None:
# save the active face detection results video (for debugging).
self.v_out = cv2.VideoWriter(out_video_path, cv2.VideoWriter_fourcc(*'mp4v'), 25, (int(w), int(h)))
# record the time spent by each module.
self.elapsed_time = {'faceTime':[], 'trackTime':[], 'cropTime':[],'asdTime':[], 'featTime':[], 'totalTime':[]}
def run(self):
frames, face_det_frames = [], []
for [audio_sample_st, audio_sample_ed] in self.audio_vad:
frame_st, frame_ed = int(audio_sample_st/640), int(audio_sample_ed/640) # 16000采样率/640=25fps,转换为视频的25fps帧数
num_frames = frame_ed - frame_st + 1
# go to frame 'frame_st'.
self.cap.set(cv2.CAP_PROP_POS_FRAMES, frame_st)
index = 0
for _ in range(num_frames):
ret, frame = self.cap.read()
if not ret:
break
if index % self.face_det_stride==0:
face_det_frames.append(frame)
frames.append(frame)
if (index + 1) % self.shot_stride==0:
audio = self.audio[(frame_st + index + 1 - self.shot_stride)*640:(frame_st + index + 1)*640]
self.process_one_shot(frames, face_det_frames, audio, frame_st + index + 1 - self.shot_stride)
frames, face_det_frames = [], []
index += 1
if len(frames) != 0:
audio = self.audio[(frame_st + index - len(frames))*640:(frame_st + index)*640]
self.process_one_shot(frames, face_det_frames, audio, frame_st + index - len(frames))
frames, face_det_frames = [], []
self.cap.release()
if self.out_video_path is not None:
self.v_out.release()
out_data = {
'embeddings':self.active_facial_embs['feat'], # 'times': self.active_facial_embs['frameI']*0.04, # 25 fps
'frameI': self.active_facial_embs['frameI'], # 说话人活跃的人脸帧索引
'faceI': self.active_facial_embs['faceI'], # 存在人脸的帧索引
'face': self.active_facial_embs['face'],
'face_bbox': self.active_facial_embs['face_bbox'],
'lip': self.active_facial_embs['lip'],
'lip_bbox': self.active_facial_embs['lip_bbox'],
}
pickle.dump(out_data, open(self.out_feat_path, 'wb'))
# print elapsed time
all_elapsed_time = 0
for k in self.elapsed_time:
all_elapsed_time += sum(self.elapsed_time[k])
self.elapsed_time[k] = sum(self.elapsed_time[k])
elapsed_time_msg = 'The total time for %s is %.2fs, including' % (self.video_path, all_elapsed_time)
for k in self.elapsed_time:
elapsed_time_msg += ' %s %.2fs,'%(k, self.elapsed_time[k])
print(elapsed_time_msg[:-1]+'.')
try:
del out_data
except Exception:
pass
def process_one_shot(self, frames, face_det_frames, audio, frame_st=None):
curTime = time.time()
dets = self.face_detection(face_det_frames)
faceTime = time.time()
allTracks, vidTracks = [], []
allTracks.extend(self.track_shot(dets))
trackTime = time.time()
for ii, track in enumerate(allTracks):
vidTracks.append(self.crop_video(track, frames, audio))
cropTime = time.time()
scores = self.evaluate_asd(vidTracks)
asdTime = time.time()
active_facial_embs = self.evaluate_fr(frames, vidTracks, scores)
self.active_facial_embs['frameI'] = np.append(self.active_facial_embs['frameI'], active_facial_embs['frameI'] + frame_st)
self.active_facial_embs['feat'] = np.append(self.active_facial_embs['feat'], active_facial_embs['feat'], axis=0)
self.active_facial_embs['faceI'] = np.append(self.active_facial_embs['faceI'], active_facial_embs['faceI'] + frame_st)
self.active_facial_embs['face'].extend(active_facial_embs['face'])
self.active_facial_embs['face_bbox'] = np.vstack([self.active_facial_embs['face_bbox'], active_facial_embs['face_bbox']])
self.active_facial_embs['lip'].extend(active_facial_embs['lip'])
self.active_facial_embs['lip_bbox']= np.vstack([self.active_facial_embs['lip_bbox'], active_facial_embs['lip_bbox']])
featTime = time.time()
if self.out_video_path is not None:
self.visualization(frames, vidTracks, scores, active_facial_embs)
try:
del dets, allTracks, vidTracks, active_facial_embs
except Exception:
pass
self.elapsed_time['faceTime'].append(faceTime-curTime)
self.elapsed_time['trackTime'].append(trackTime-faceTime)
self.elapsed_time['cropTime'].append(cropTime-trackTime)
self.elapsed_time['asdTime'].append(asdTime-cropTime)
self.elapsed_time['featTime'].append(featTime-asdTime)
self.elapsed_time['totalTime'].append(featTime-curTime)
def face_detection(self, frames):
dets = []
for fidx, image in enumerate(frames):
image_input = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
bboxes, _, probs = self.visual_models.detect_faces(image_input, top_k=10, prob_threshold=0.9)
bboxes = torch.cat([bboxes, probs.reshape(-1, 1)], dim=-1)
dets.append([])
for bbox in bboxes:
frame_idex = fidx * self.face_det_stride
dets[-1].append({'frame':frame_idex, 'bbox':(bbox[:-1]).tolist(), 'conf':bbox[-1]})
return dets
def bb_intersection_over_union(self, boxA, boxB, evalCol=False):
# IOU Function to calculate overlap between two image
xA = max(boxA[0], boxB[0])
yA = max(boxA[1], boxB[1])
xB = min(boxA[2], boxB[2])
yB = min(boxA[3], boxB[3])
interArea = max(0, xB - xA) * max(0, yB - yA)
boxAArea = (boxA[2] - boxA[0]) * (boxA[3] - boxA[1])
boxBArea = (boxB[2] - boxB[0]) * (boxB[3] - boxB[1])
if evalCol == True:
iou = interArea / float(boxAArea)
else:
iou = interArea / float(boxAArea + boxBArea - interArea)
return iou
def track_shot(self, scene_faces):
# Face tracking
tracks = []
while True: # continuously search for consecutive faces.
track = []
for frame_faces in scene_faces:
for face in frame_faces:
if track == []:
track.append(face)
frame_faces.remove(face)
break
elif face['frame'] - track[-1]['frame'] <= self.num_failed_det: # the face does not interrupt for 'num_failed_det' frame.
iou = self.bb_intersection_over_union(face['bbox'], track[-1]['bbox'])
# minimum IOU between consecutive face.
if iou > 0.5:
track.append(face)
frame_faces.remove(face)
break
else:
break
if track == []:
break
elif len(track) > 1 and track[-1]['frame'] - track[0]['frame'] + 1 >= self.min_track:
frame_num = np.array([ f['frame'] for f in track ])
bboxes = np.array([np.array(f['bbox']) for f in track])
frameI = np.arange(frame_num[0], frame_num[-1]+1)
bboxesI = []
for ij in range(0, 4):
interpfn = interp1d(frame_num, bboxes[:,ij]) # missing boxes can be filled by interpolation.
bboxesI.append(interpfn(frameI))
bboxesI = np.stack(bboxesI, axis=1)
if max(np.mean(bboxesI[:,2]-bboxesI[:,0]), np.mean(bboxesI[:,3]-bboxesI[:,1])) > self.min_face_size: # need face size > min_face_size
tracks.append({'frame':frameI,'bbox':bboxesI})
return tracks
def crop_video(self, track, frames, audio):
# crop the face clips
crop_frames = []
dets = {'x':[], 'y':[], 's':[]}
for det in track['bbox']:
dets['s'].append(max((det[3]-det[1]), (det[2]-det[0]))/2)
dets['y'].append((det[1]+det[3])/2) # crop center x
dets['x'].append((det[0]+det[2])/2) # crop center y
for fidx, frame in enumerate(track['frame']):
cs = self.crop_scale
bs = dets['s'][fidx] # detection box size
bsi = int(bs * (1 + 2 * cs)) # pad videos by this amount
image = frames[frame]
frame = np.pad(image, ((bsi,bsi), (bsi,bsi), (0, 0)), 'constant', constant_values=(110, 110))
my = dets['y'][fidx] + bsi # BBox center Y
mx = dets['x'][fidx] + bsi # BBox center X
face = frame[int(my-bs):int(my+bs*(1+2*cs)),int(mx-bs*(1+cs)):int(mx+bs*(1+cs))]
crop_frames.append(cv2.resize(face, (224, 224)))
cropaudio = audio[track['frame'][0]*640:(track['frame'][-1]+1)*640]
return {'track':track, 'proc_track':dets, 'data':[crop_frames, cropaudio]}
def evaluate_asd(self, tracks):
# active speaker detection by pretrained TalkNet
all_scores = []
for ins in tracks:
video, audio = ins['data']
audio_feature = python_speech_features.mfcc(audio, 16000, numcep = 13, winlen = 0.025, winstep = 0.010)
video_feature = []
for frame in video:
face = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
h0, w0 = face.shape
interp = cv2.INTER_CUBIC if (h0 < 224 or w0 < 224) else cv2.INTER_AREA
face = cv2.resize(face, (224,224), interpolation=interp)
# face = cv2.resize(face, (224,224))
face = face[int(112-(112/2)):int(112+(112/2)), int(112-(112/2)):int(112+(112/2))]
video_feature.append(face)
video_feature = np.array(video_feature)
length = min((audio_feature.shape[0] - audio_feature.shape[0] % 4) / 100, video_feature.shape[0] / 25)
audio_feature = audio_feature[:int(round(length * 100)),:]
video_feature = video_feature[:int(round(length * 25)),:,:]
audio_feature = np.expand_dims(audio_feature, axis=0).astype(np.float32)
video_feature = np.expand_dims(video_feature, axis=0).astype(np.float32)
score = self.visual_models.asd_score(audio_feature, video_feature)
all_score = np.asarray(score, dtype=np.float32)
all_scores.append(all_score)
try:
del audio_feature, video_feature, score
except Exception:
pass
return all_scores
def evaluate_fr(self, frames, tracks, scores):
SMOOTH_W = 4
ON_THRESHOLD = 0.0
OFF_THRESHOLD = -0.5
QUALITY_HIGH = 0.0
QUALITY_LOW = -0.3
# 先平滑每个 track 的 scores
smooth_scores_all = []
for score in scores:
s = np.asarray(score).flatten()
if s.size == 0:
smooth_scores_all.append(s)
continue
# 中值 + 简单移动平均
s_med = signal.medfilt(s, kernel_size=5 if len(s)>=5 else 3)
k = np.ones(5)/5
s_avg = np.convolve(s_med, k, mode='same')
smooth_scores_all.append(s_avg)
# aggregate faces per frame
faces = [[] for _ in range(len(frames))]
for tidx, track in enumerate(tracks):
score = smooth_scores_all[tidx]
for fidx, frame in enumerate(track['track']['frame'].tolist()):
s = score[max(fidx - SMOOTH_W, 0): min(fidx + SMOOTH_W+1, len(score))]
s = float(np.mean(s))
bbox = track['track']['bbox'][fidx]
bbox = bbox.astype(np.int32)
face = frames[frame][max(bbox[1],0):min(bbox[3],frames[frame].shape[0]),
max(bbox[0],0):min(bbox[2],frames[frame].shape[1])]
faces[frame].append({'track':tidx, 'score':s, 'facedata':face, 'bbox': bbox})
# per-frame decision
active_facial_embs = {
'frameI': [],
'trackI': [],
'faceI': [],
'face': [],
'face_bbox': [],
'feat': [],
'lip': [],
'lip_bbox': [],
}
# 这里做简单 per-frame decision: 选 score 最大的
for fidx in range(0, len(faces), max(1, self.face_det_stride)):
if len(faces[fidx]) == 0:
continue
# choose best candidate by score
best = max(faces[fidx], key=lambda x: x['score'])
res = self.visual_models.detect_lip(best['facedata'])
# 如果没有检测到嘴唇,跳过,会筛去低质量像素的人脸
if res is None or res.get('lip_crop') is None:
continue
# 只要该帧检测到一张或者多种人脸,就保存一个最有可能是说话人(best['facedata'])的人脸(不管说不说话)
active_facial_embs['faceI'].append(fidx)
active_facial_embs['face'].append(best['facedata']) # BGR ndarray
active_facial_embs['lip'].append(res.get('lip_crop')) # BGR ndarray
active_facial_embs['face_bbox'].append(best['bbox']) # 相对于整个一帧图片的脸的位置坐标
active_facial_embs['lip_bbox'].append(res.get('lip_bbox')) # 相对于脸框图的位置坐标
feature = self.visual_models.get_face_embedding(best['facedata'])
active_facial_embs['feat'].append(feature) # 完整面部特征
s = best['score']
if s < OFF_THRESHOLD:
continue
# 人脸质量评估(可选,开启后只会筛选评分更高的人脸帧)
# face_q_score = self.visual_models.face_quality_score(best['facedata'])
# if (face_q_score >= QUALITY_HIGH) or (face_q_score >= QUALITY_LOW and s >= ON_THRESHOLD):
if s >= OFF_THRESHOLD:
# feature, feature_normalized = self.visual_models.get_face_embedding(best['facedata']) # 仅保留模型认为在说话帧
active_facial_embs['frameI'].append(fidx)
active_facial_embs['trackI'].append(best['track'])
# 转 numpy
active_facial_embs['frameI'] = np.array(active_facial_embs['frameI'], dtype=np.int32)
active_facial_embs['trackI'] = np.array(active_facial_embs['trackI'], dtype=np.int32)
active_facial_embs['faceI'] = np.array(active_facial_embs['faceI'], dtype=np.int32)
active_facial_embs['face_bbox'] = np.array(active_facial_embs['face_bbox'], dtype=np.int32) if active_facial_embs['face_bbox'] else np.empty((0,4), np.int32)
active_facial_embs['lip_bbox'] = np.array(active_facial_embs['lip_bbox'], dtype=np.int32) if active_facial_embs['lip_bbox'] else np.empty((0,4), np.int32)
active_facial_embs['feat'] = np.vstack(active_facial_embs['feat']) if active_facial_embs['feat'] else np.empty((0,512), np.float32)
return active_facial_embs
def visualization(self, frames, tracks, scores, embs=None):
# 先聚合所有 track 在每帧的 bbox/score 信息(与原实现一致)
faces = [[] for _ in range(len(frames))]
for tidx, track in enumerate(tracks):
score = scores[tidx]
for fidx, frame in enumerate(track['track']['frame'].tolist()):
s = score[max(fidx - 2, 0): min(fidx + 3, len(score))] # 注意 len(score) 作为上界
s = np.mean(s)
faces[frame].append({'track':tidx, 'score':float(s),'bbox':track['track']['bbox'][fidx]})
# 构造已保存帧集合(相对于本 shot)
feat_set = set()
lip_bbox_dict = {} # 存储嘴唇边界框的字典
if embs is not None:
if 'frameI' in embs and embs['frameI'].size > 0:
trackI = embs.get('trackI')
feat_set = set((int(f), int(t)) for f, t in zip(embs['frameI'].tolist(), trackI.tolist()))
if 'lip_bbox' in embs and embs['lip_bbox'].size > 0:
for i, frame_idx in enumerate(embs['faceI']):
lip_bbox_dict[int(frame_idx)] = embs['lip_bbox'][i]
for fidx, image in enumerate(frames):
for face in faces[fidx]:
bbox = face['bbox']
x1, y1, x2, y2 = int(bbox[0]), int(bbox[1]), int(bbox[2]), int(bbox[3])
# lip bbox
lip_bbox = None
if fidx in lip_bbox_dict:
lip_bbox = lip_bbox_dict[fidx]
lip_x1 = x1 + lip_bbox[0]
lip_y1 = y1 + lip_bbox[1]
lip_x2 = x1 + lip_bbox[2]
lip_y2 = y1 + lip_bbox[3]
if (fidx, face['track']) in feat_set:
# 绿色表示已保存, 蓝色表示嘴唇
cv2.rectangle(image, (x1, y1), (x2, y2), (0, 255, 0), 2)
if lip_bbox is not None:
cv2.rectangle(image, (lip_x1, lip_y1), (lip_x2, lip_y2), (255, 0, 0), 2)
txt = round(face['score'], 2)
cv2.putText(image, '%s'%(txt), (x1, max(y1-6,0)), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0, 255, 0), 1)
else:
# 红色表示未保存
cv2.rectangle(image, (x1, y1), (x2, y2), (0, 0, 255), 2)
if lip_bbox is not None:
cv2.rectangle(image, (lip_x1, lip_y1), (lip_x2, lip_y2), (255, 0, 0), 2)
txt = round(face['score'], 2)
cv2.putText(image, '%s'%(txt), (x1, max(y1-6,0)), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0,0,255), 1)
# 写入视频
self.v_out.write(image)
def close(self):
try:
if hasattr(self, "active_facial_embs"):
for k, v in self.active_facial_embs.items():
if isinstance(v, np.ndarray):
del v
elif isinstance(v, list):
v.clear()
self.active_facial_embs.clear()
except Exception as e:
print(f"[WARN] Error while closing VisionProcesser: {e}")
gc.collect()
def __del__(self):
self.close()