""" This script uses pretrained models to perform speaker visual embeddings extracting. This script use following open source models: 1. Face detection: https://github.com/Linzaer/Ultra-Light-Fast-Generic-Face-Detector-1MB 2. Active speaker detection: TalkNet, https://github.com/TaoRuijie/TalkNet-ASD 3. Face quality assessment: https://modelscope.cn/models/iic/cv_manual_face-quality-assessment_fqa 4. Face recognition: https://modelscope.cn/models/iic/cv_ir101_facerecognition_cfglint 5. Lip detection: https://huggingface.co/pyannote/segmentation-3.0 Processing pipeline: 1. Face detection (input: video frames) 2. Active speaker detection (input: consecutive face frames, audio) 3. Face quality assessment (input: video frames) 4. Face recognition (input: video frames) 5. Lip detection (input: video frames) """ import numpy as np from scipy.io import wavfile from scipy.interpolate import interp1d import time, torch, cv2, pickle, gc, python_speech_features from scipy import signal class VisionProcesser(): def __init__( self, video_file_path, audio_file_path, audio_vad, out_feat_path, visual_models, conf=None, out_video_path=None ): # read audio data and check the samplerate. fs, audio = wavfile.read(audio_file_path) if len(audio.shape) > 1: audio = audio.mean(axis=1) duration = audio.shape[0] / fs target_length = int(duration * 16000) self.audio = signal.resample(audio, target_length) # convert time interval to integer sampling point interval. audio_vad = [[int(i*16000), int(j*16000)] for (i, j) in audio_vad] self.video_path = video_file_path # read video data self.cap = cv2.VideoCapture(video_file_path) w = self.cap.get(cv2.CAP_PROP_FRAME_WIDTH) h = self.cap.get(cv2.CAP_PROP_FRAME_HEIGHT) self.count = int(self.cap.get(cv2.CAP_PROP_FRAME_COUNT)) self.fps = self.cap.get(cv2.CAP_PROP_FPS) print('video %s info: w: {}, h: {}, count: {}, fps: {}'.format(w, h, self.count, self.fps) % self.video_path) # initial vision models self.visual_models = visual_models # store facial feats along with the necessary information. self.active_facial_embs = { 'frameI':np.empty((0,), dtype=np.int32), 'feat':np.empty((0, 512), dtype=np.float32), 'faceI': np.empty((0,), dtype=np.int32), 'face': [], 'face_bbox': np.empty((0, 4), dtype=np.int32), 'lip': [], 'lip_bbox': np.empty((0, 4), dtype=np.int32), } self.audio_vad = audio_vad self.out_video_path = out_video_path self.out_feat_path = out_feat_path self.min_track = conf['min_track'] self.num_failed_det = conf['num_failed_det'] self.crop_scale = conf['crop_scale'] self.min_face_size = conf['min_face_size'] self.face_det_stride = conf['face_det_stride'] self.shot_stride = conf['shot_stride'] if self.out_video_path is not None: # save the active face detection results video (for debugging). self.v_out = cv2.VideoWriter(out_video_path, cv2.VideoWriter_fourcc(*'mp4v'), 25, (int(w), int(h))) # record the time spent by each module. self.elapsed_time = {'faceTime':[], 'trackTime':[], 'cropTime':[],'asdTime':[], 'featTime':[], 'totalTime':[]} def run(self): frames, face_det_frames = [], [] for [audio_sample_st, audio_sample_ed] in self.audio_vad: frame_st, frame_ed = int(audio_sample_st/640), int(audio_sample_ed/640) # 16000采样率/640=25fps,转换为视频的25fps帧数 num_frames = frame_ed - frame_st + 1 # go to frame 'frame_st'. self.cap.set(cv2.CAP_PROP_POS_FRAMES, frame_st) index = 0 for _ in range(num_frames): ret, frame = self.cap.read() if not ret: break if index % self.face_det_stride==0: face_det_frames.append(frame) frames.append(frame) if (index + 1) % self.shot_stride==0: audio = self.audio[(frame_st + index + 1 - self.shot_stride)*640:(frame_st + index + 1)*640] self.process_one_shot(frames, face_det_frames, audio, frame_st + index + 1 - self.shot_stride) frames, face_det_frames = [], [] index += 1 if len(frames) != 0: audio = self.audio[(frame_st + index - len(frames))*640:(frame_st + index)*640] self.process_one_shot(frames, face_det_frames, audio, frame_st + index - len(frames)) frames, face_det_frames = [], [] self.cap.release() if self.out_video_path is not None: self.v_out.release() out_data = { 'embeddings':self.active_facial_embs['feat'], # 'times': self.active_facial_embs['frameI']*0.04, # 25 fps 'frameI': self.active_facial_embs['frameI'], # 说话人活跃的人脸帧索引 'faceI': self.active_facial_embs['faceI'], # 存在人脸的帧索引 'face': self.active_facial_embs['face'], 'face_bbox': self.active_facial_embs['face_bbox'], 'lip': self.active_facial_embs['lip'], 'lip_bbox': self.active_facial_embs['lip_bbox'], } pickle.dump(out_data, open(self.out_feat_path, 'wb')) # print elapsed time all_elapsed_time = 0 for k in self.elapsed_time: all_elapsed_time += sum(self.elapsed_time[k]) self.elapsed_time[k] = sum(self.elapsed_time[k]) elapsed_time_msg = 'The total time for %s is %.2fs, including' % (self.video_path, all_elapsed_time) for k in self.elapsed_time: elapsed_time_msg += ' %s %.2fs,'%(k, self.elapsed_time[k]) print(elapsed_time_msg[:-1]+'.') try: del out_data except Exception: pass def process_one_shot(self, frames, face_det_frames, audio, frame_st=None): curTime = time.time() dets = self.face_detection(face_det_frames) faceTime = time.time() allTracks, vidTracks = [], [] allTracks.extend(self.track_shot(dets)) trackTime = time.time() for ii, track in enumerate(allTracks): vidTracks.append(self.crop_video(track, frames, audio)) cropTime = time.time() scores = self.evaluate_asd(vidTracks) asdTime = time.time() active_facial_embs = self.evaluate_fr(frames, vidTracks, scores) self.active_facial_embs['frameI'] = np.append(self.active_facial_embs['frameI'], active_facial_embs['frameI'] + frame_st) self.active_facial_embs['feat'] = np.append(self.active_facial_embs['feat'], active_facial_embs['feat'], axis=0) self.active_facial_embs['faceI'] = np.append(self.active_facial_embs['faceI'], active_facial_embs['faceI'] + frame_st) self.active_facial_embs['face'].extend(active_facial_embs['face']) self.active_facial_embs['face_bbox'] = np.vstack([self.active_facial_embs['face_bbox'], active_facial_embs['face_bbox']]) self.active_facial_embs['lip'].extend(active_facial_embs['lip']) self.active_facial_embs['lip_bbox']= np.vstack([self.active_facial_embs['lip_bbox'], active_facial_embs['lip_bbox']]) featTime = time.time() if self.out_video_path is not None: self.visualization(frames, vidTracks, scores, active_facial_embs) try: del dets, allTracks, vidTracks, active_facial_embs except Exception: pass self.elapsed_time['faceTime'].append(faceTime-curTime) self.elapsed_time['trackTime'].append(trackTime-faceTime) self.elapsed_time['cropTime'].append(cropTime-trackTime) self.elapsed_time['asdTime'].append(asdTime-cropTime) self.elapsed_time['featTime'].append(featTime-asdTime) self.elapsed_time['totalTime'].append(featTime-curTime) def face_detection(self, frames): dets = [] for fidx, image in enumerate(frames): image_input = cv2.cvtColor(image, cv2.COLOR_BGR2RGB) bboxes, _, probs = self.visual_models.detect_faces(image_input, top_k=10, prob_threshold=0.9) bboxes = torch.cat([bboxes, probs.reshape(-1, 1)], dim=-1) dets.append([]) for bbox in bboxes: frame_idex = fidx * self.face_det_stride dets[-1].append({'frame':frame_idex, 'bbox':(bbox[:-1]).tolist(), 'conf':bbox[-1]}) return dets def bb_intersection_over_union(self, boxA, boxB, evalCol=False): # IOU Function to calculate overlap between two image xA = max(boxA[0], boxB[0]) yA = max(boxA[1], boxB[1]) xB = min(boxA[2], boxB[2]) yB = min(boxA[3], boxB[3]) interArea = max(0, xB - xA) * max(0, yB - yA) boxAArea = (boxA[2] - boxA[0]) * (boxA[3] - boxA[1]) boxBArea = (boxB[2] - boxB[0]) * (boxB[3] - boxB[1]) if evalCol == True: iou = interArea / float(boxAArea) else: iou = interArea / float(boxAArea + boxBArea - interArea) return iou def track_shot(self, scene_faces): # Face tracking tracks = [] while True: # continuously search for consecutive faces. track = [] for frame_faces in scene_faces: for face in frame_faces: if track == []: track.append(face) frame_faces.remove(face) break elif face['frame'] - track[-1]['frame'] <= self.num_failed_det: # the face does not interrupt for 'num_failed_det' frame. iou = self.bb_intersection_over_union(face['bbox'], track[-1]['bbox']) # minimum IOU between consecutive face. if iou > 0.5: track.append(face) frame_faces.remove(face) break else: break if track == []: break elif len(track) > 1 and track[-1]['frame'] - track[0]['frame'] + 1 >= self.min_track: frame_num = np.array([ f['frame'] for f in track ]) bboxes = np.array([np.array(f['bbox']) for f in track]) frameI = np.arange(frame_num[0], frame_num[-1]+1) bboxesI = [] for ij in range(0, 4): interpfn = interp1d(frame_num, bboxes[:,ij]) # missing boxes can be filled by interpolation. bboxesI.append(interpfn(frameI)) bboxesI = np.stack(bboxesI, axis=1) if max(np.mean(bboxesI[:,2]-bboxesI[:,0]), np.mean(bboxesI[:,3]-bboxesI[:,1])) > self.min_face_size: # need face size > min_face_size tracks.append({'frame':frameI,'bbox':bboxesI}) return tracks def crop_video(self, track, frames, audio): # crop the face clips crop_frames = [] dets = {'x':[], 'y':[], 's':[]} for det in track['bbox']: dets['s'].append(max((det[3]-det[1]), (det[2]-det[0]))/2) dets['y'].append((det[1]+det[3])/2) # crop center x dets['x'].append((det[0]+det[2])/2) # crop center y for fidx, frame in enumerate(track['frame']): cs = self.crop_scale bs = dets['s'][fidx] # detection box size bsi = int(bs * (1 + 2 * cs)) # pad videos by this amount image = frames[frame] frame = np.pad(image, ((bsi,bsi), (bsi,bsi), (0, 0)), 'constant', constant_values=(110, 110)) my = dets['y'][fidx] + bsi # BBox center Y mx = dets['x'][fidx] + bsi # BBox center X face = frame[int(my-bs):int(my+bs*(1+2*cs)),int(mx-bs*(1+cs)):int(mx+bs*(1+cs))] crop_frames.append(cv2.resize(face, (224, 224))) cropaudio = audio[track['frame'][0]*640:(track['frame'][-1]+1)*640] return {'track':track, 'proc_track':dets, 'data':[crop_frames, cropaudio]} def evaluate_asd(self, tracks): # active speaker detection by pretrained TalkNet all_scores = [] for ins in tracks: video, audio = ins['data'] audio_feature = python_speech_features.mfcc(audio, 16000, numcep = 13, winlen = 0.025, winstep = 0.010) video_feature = [] for frame in video: face = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY) h0, w0 = face.shape interp = cv2.INTER_CUBIC if (h0 < 224 or w0 < 224) else cv2.INTER_AREA face = cv2.resize(face, (224,224), interpolation=interp) # face = cv2.resize(face, (224,224)) face = face[int(112-(112/2)):int(112+(112/2)), int(112-(112/2)):int(112+(112/2))] video_feature.append(face) video_feature = np.array(video_feature) length = min((audio_feature.shape[0] - audio_feature.shape[0] % 4) / 100, video_feature.shape[0] / 25) audio_feature = audio_feature[:int(round(length * 100)),:] video_feature = video_feature[:int(round(length * 25)),:,:] audio_feature = np.expand_dims(audio_feature, axis=0).astype(np.float32) video_feature = np.expand_dims(video_feature, axis=0).astype(np.float32) score = self.visual_models.asd_score(audio_feature, video_feature) all_score = np.asarray(score, dtype=np.float32) all_scores.append(all_score) try: del audio_feature, video_feature, score except Exception: pass return all_scores def evaluate_fr(self, frames, tracks, scores): SMOOTH_W = 4 ON_THRESHOLD = 0.0 OFF_THRESHOLD = -0.5 QUALITY_HIGH = 0.0 QUALITY_LOW = -0.3 # 先平滑每个 track 的 scores smooth_scores_all = [] for score in scores: s = np.asarray(score).flatten() if s.size == 0: smooth_scores_all.append(s) continue # 中值 + 简单移动平均 s_med = signal.medfilt(s, kernel_size=5 if len(s)>=5 else 3) k = np.ones(5)/5 s_avg = np.convolve(s_med, k, mode='same') smooth_scores_all.append(s_avg) # aggregate faces per frame faces = [[] for _ in range(len(frames))] for tidx, track in enumerate(tracks): score = smooth_scores_all[tidx] for fidx, frame in enumerate(track['track']['frame'].tolist()): s = score[max(fidx - SMOOTH_W, 0): min(fidx + SMOOTH_W+1, len(score))] s = float(np.mean(s)) bbox = track['track']['bbox'][fidx] bbox = bbox.astype(np.int32) face = frames[frame][max(bbox[1],0):min(bbox[3],frames[frame].shape[0]), max(bbox[0],0):min(bbox[2],frames[frame].shape[1])] faces[frame].append({'track':tidx, 'score':s, 'facedata':face, 'bbox': bbox}) # per-frame decision active_facial_embs = { 'frameI': [], 'trackI': [], 'faceI': [], 'face': [], 'face_bbox': [], 'feat': [], 'lip': [], 'lip_bbox': [], } # 这里做简单 per-frame decision: 选 score 最大的 for fidx in range(0, len(faces), max(1, self.face_det_stride)): if len(faces[fidx]) == 0: continue # choose best candidate by score best = max(faces[fidx], key=lambda x: x['score']) res = self.visual_models.detect_lip(best['facedata']) # 如果没有检测到嘴唇,跳过,会筛去低质量像素的人脸 if res is None or res.get('lip_crop') is None: continue # 只要该帧检测到一张或者多种人脸,就保存一个最有可能是说话人(best['facedata'])的人脸(不管说不说话) active_facial_embs['faceI'].append(fidx) active_facial_embs['face'].append(best['facedata']) # BGR ndarray active_facial_embs['lip'].append(res.get('lip_crop')) # BGR ndarray active_facial_embs['face_bbox'].append(best['bbox']) # 相对于整个一帧图片的脸的位置坐标 active_facial_embs['lip_bbox'].append(res.get('lip_bbox')) # 相对于脸框图的位置坐标 feature = self.visual_models.get_face_embedding(best['facedata']) active_facial_embs['feat'].append(feature) # 完整面部特征 s = best['score'] if s < OFF_THRESHOLD: continue # 人脸质量评估(可选,开启后只会筛选评分更高的人脸帧) # face_q_score = self.visual_models.face_quality_score(best['facedata']) # if (face_q_score >= QUALITY_HIGH) or (face_q_score >= QUALITY_LOW and s >= ON_THRESHOLD): if s >= OFF_THRESHOLD: # feature, feature_normalized = self.visual_models.get_face_embedding(best['facedata']) # 仅保留模型认为在说话帧 active_facial_embs['frameI'].append(fidx) active_facial_embs['trackI'].append(best['track']) # 转 numpy active_facial_embs['frameI'] = np.array(active_facial_embs['frameI'], dtype=np.int32) active_facial_embs['trackI'] = np.array(active_facial_embs['trackI'], dtype=np.int32) active_facial_embs['faceI'] = np.array(active_facial_embs['faceI'], dtype=np.int32) active_facial_embs['face_bbox'] = np.array(active_facial_embs['face_bbox'], dtype=np.int32) if active_facial_embs['face_bbox'] else np.empty((0,4), np.int32) active_facial_embs['lip_bbox'] = np.array(active_facial_embs['lip_bbox'], dtype=np.int32) if active_facial_embs['lip_bbox'] else np.empty((0,4), np.int32) active_facial_embs['feat'] = np.vstack(active_facial_embs['feat']) if active_facial_embs['feat'] else np.empty((0,512), np.float32) return active_facial_embs def visualization(self, frames, tracks, scores, embs=None): # 先聚合所有 track 在每帧的 bbox/score 信息(与原实现一致) faces = [[] for _ in range(len(frames))] for tidx, track in enumerate(tracks): score = scores[tidx] for fidx, frame in enumerate(track['track']['frame'].tolist()): s = score[max(fidx - 2, 0): min(fidx + 3, len(score))] # 注意 len(score) 作为上界 s = np.mean(s) faces[frame].append({'track':tidx, 'score':float(s),'bbox':track['track']['bbox'][fidx]}) # 构造已保存帧集合(相对于本 shot) feat_set = set() lip_bbox_dict = {} # 存储嘴唇边界框的字典 if embs is not None: if 'frameI' in embs and embs['frameI'].size > 0: trackI = embs.get('trackI') feat_set = set((int(f), int(t)) for f, t in zip(embs['frameI'].tolist(), trackI.tolist())) if 'lip_bbox' in embs and embs['lip_bbox'].size > 0: for i, frame_idx in enumerate(embs['faceI']): lip_bbox_dict[int(frame_idx)] = embs['lip_bbox'][i] for fidx, image in enumerate(frames): for face in faces[fidx]: bbox = face['bbox'] x1, y1, x2, y2 = int(bbox[0]), int(bbox[1]), int(bbox[2]), int(bbox[3]) # lip bbox lip_bbox = None if fidx in lip_bbox_dict: lip_bbox = lip_bbox_dict[fidx] lip_x1 = x1 + lip_bbox[0] lip_y1 = y1 + lip_bbox[1] lip_x2 = x1 + lip_bbox[2] lip_y2 = y1 + lip_bbox[3] if (fidx, face['track']) in feat_set: # 绿色表示已保存, 蓝色表示嘴唇 cv2.rectangle(image, (x1, y1), (x2, y2), (0, 255, 0), 2) if lip_bbox is not None: cv2.rectangle(image, (lip_x1, lip_y1), (lip_x2, lip_y2), (255, 0, 0), 2) txt = round(face['score'], 2) cv2.putText(image, '%s'%(txt), (x1, max(y1-6,0)), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0, 255, 0), 1) else: # 红色表示未保存 cv2.rectangle(image, (x1, y1), (x2, y2), (0, 0, 255), 2) if lip_bbox is not None: cv2.rectangle(image, (lip_x1, lip_y1), (lip_x2, lip_y2), (255, 0, 0), 2) txt = round(face['score'], 2) cv2.putText(image, '%s'%(txt), (x1, max(y1-6,0)), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0,0,255), 1) # 写入视频 self.v_out.write(image) def close(self): try: if hasattr(self, "active_facial_embs"): for k, v in self.active_facial_embs.items(): if isinstance(v, np.ndarray): del v elif isinstance(v, list): v.clear() self.active_facial_embs.clear() except Exception as e: print(f"[WARN] Error while closing VisionProcesser: {e}") gc.collect() def __del__(self): self.close()