import math import numpy as np from collections import defaultdict from loguru import logger def sample_from_clip( lmdb_manager, audio_file, audio_each_file, pose_each_file, trans_each_file, trans_v_each_file, shape_each_file, facial_each_file, word_each_file, vid_each_file, emo_each_file, sem_each_file, args, ori_stride, ori_length, disable_filtering, clean_first_seconds, clean_final_seconds, is_test, n_out_samples): """Sample clips from the data according to specified parameters.""" round_seconds_skeleton = pose_each_file.shape[0] // args.pose_fps # Calculate timing information timing_info = calculate_timing_info( audio_each_file, facial_each_file, round_seconds_skeleton, args.audio_fps, args.pose_fps, args.audio_sr, args.audio_rep ) round_seconds_skeleton = timing_info['final_seconds'] # Calculate clip boundaries clip_info = calculate_clip_boundaries( round_seconds_skeleton, clean_first_seconds, clean_final_seconds, args.audio_fps, args.pose_fps ) n_filtered_out = defaultdict(int) # Process each training length ratio for ratio in args.multi_length_training: processed_data = process_data_with_ratio( ori_stride, ori_length, ratio, clip_info, args, is_test, audio_each_file, pose_each_file, trans_each_file, trans_v_each_file, shape_each_file, facial_each_file, word_each_file, vid_each_file, emo_each_file, sem_each_file, audio_file, lmdb_manager, n_out_samples ) for type_key, count in processed_data['filtered_counts'].items(): n_filtered_out[type_key] += count n_out_samples = processed_data['n_out_samples'] return n_filtered_out, n_out_samples def calculate_timing_info(audio_data, facial_data, round_seconds_skeleton, audio_fps, pose_fps, audio_sr, audio_rep): """Calculate timing information for the data.""" if audio_data is not None: if audio_rep != "wave16k": round_seconds_audio = len(audio_data) // audio_fps elif audio_rep == "mfcc": round_seconds_audio = audio_data.shape[0] // audio_fps else: round_seconds_audio = audio_data.shape[0] // audio_sr if facial_data is not None: round_seconds_facial = facial_data.shape[0] // pose_fps logger.info(f"audio: {round_seconds_audio}s, pose: {round_seconds_skeleton}s, facial: {round_seconds_facial}s") final_seconds = min(round_seconds_audio, round_seconds_skeleton, round_seconds_facial) max_round = max(round_seconds_audio, round_seconds_skeleton, round_seconds_facial) if final_seconds != max_round: logger.warning(f"reduce to {final_seconds}s, ignore {max_round-final_seconds}s") else: logger.info(f"pose: {round_seconds_skeleton}s, audio: {round_seconds_audio}s") final_seconds = min(round_seconds_audio, round_seconds_skeleton) max_round = max(round_seconds_audio, round_seconds_skeleton) if final_seconds != max_round: logger.warning(f"reduce to {final_seconds}s, ignore {max_round-final_seconds}s") else: final_seconds = round_seconds_skeleton return { 'final_seconds': final_seconds } def calculate_clip_boundaries(round_seconds, clean_first_seconds, clean_final_seconds, audio_fps, pose_fps): """Calculate the boundaries for clip sampling.""" clip_s_t = clean_first_seconds clip_e_t = round_seconds - clean_final_seconds return { 'clip_s_t': clip_s_t, 'clip_e_t': clip_e_t, 'clip_s_f_audio': audio_fps * clip_s_t, 'clip_e_f_audio': clip_e_t * audio_fps, 'clip_s_f_pose': clip_s_t * pose_fps, 'clip_e_f_pose': clip_e_t * pose_fps } def process_data_with_ratio(ori_stride, ori_length, ratio, clip_info, args, is_test, audio_data, pose_data, trans_data, trans_v_data, shape_data, facial_data, word_data, vid_data, emo_data, sem_data, audio_file, lmdb_manager, n_out_samples): """Process data with a specific training length ratio.""" if is_test and not args.test_clip: cut_length = clip_info['clip_e_f_pose'] - clip_info['clip_s_f_pose'] args.stride = cut_length max_length = cut_length else: args.stride = int(ratio * ori_stride) cut_length = int(ori_length * ratio) num_subdivision = math.floor( (clip_info['clip_e_f_pose'] - clip_info['clip_s_f_pose'] - cut_length) / args.stride ) + 1 logger.info(f"pose from frame {clip_info['clip_s_f_pose']} to {clip_info['clip_e_f_pose']}, length {cut_length}") logger.info(f"{num_subdivision} clips is expected with stride {args.stride}") if audio_data is not None: audio_short_length = math.floor(cut_length / args.pose_fps * args.audio_fps) logger.info(f"audio from frame {clip_info['clip_s_f_audio']} to {clip_info['clip_e_f_audio']}, length {audio_short_length}") # Process subdivisions filtered_counts = defaultdict(int) for i in range(num_subdivision): sample_data = extract_sample_data( i, clip_info, cut_length, args, audio_data, pose_data, trans_data, trans_v_data, shape_data, facial_data, word_data, vid_data, emo_data, sem_data, audio_file, audio_short_length ) if sample_data['pose'].any() is not None: lmdb_manager.add_sample([ sample_data['pose'], sample_data['audio'], sample_data['facial'], sample_data['shape'], sample_data['word'], sample_data['emo'], sample_data['sem'], sample_data['vid'], sample_data['trans'], sample_data['trans_v'], sample_data['audio_name'] ]) n_out_samples += 1 return { 'filtered_counts': filtered_counts, 'n_out_samples': n_out_samples } def extract_sample_data(idx, clip_info, cut_length, args, audio_data, pose_data, trans_data, trans_v_data, shape_data, facial_data, word_data, vid_data, emo_data, sem_data, audio_file, audio_short_length): """Extract a single sample from the data.""" start_idx = clip_info['clip_s_f_pose'] + idx * args.stride fin_idx = start_idx + cut_length sample_data = { 'pose': pose_data[start_idx:fin_idx], 'trans': trans_data[start_idx:fin_idx], 'trans_v': trans_v_data[start_idx:fin_idx], 'shape': shape_data[start_idx:fin_idx], 'facial': facial_data[start_idx:fin_idx] if args.facial_rep is not None else np.array([-1]), 'word': word_data[start_idx:fin_idx] if args.word_rep is not None else np.array([-1]), 'emo': emo_data[start_idx:fin_idx] if args.emo_rep is not None else np.array([-1]), 'sem': sem_data[start_idx:fin_idx] if args.sem_rep is not None else np.array([-1]), 'vid': vid_data[start_idx:fin_idx] if args.id_rep is not None else np.array([-1]), 'audio_name': audio_file } if audio_data is not None: audio_start = clip_info['clip_s_f_audio'] + math.floor(idx * args.stride * args.audio_fps / args.pose_fps) audio_end = audio_start + audio_short_length sample_data['audio'] = audio_data[audio_start:audio_end] else: sample_data['audio'] = np.array([-1]) return sample_data