DimasMP3's picture
Upload folder using huggingface_hub
5c69097 verified
import os, torch, numpy, cv2, random, glob, python_speech_features
from scipy.io import wavfile
from torchvision.transforms import RandomCrop
def generate_audio_set(dataPath, batchList):
audioSet = {}
for line in batchList:
data = line.split('\t')
videoName = data[0][:11]
dataName = data[0]
_, audio = wavfile.read(os.path.join(dataPath, videoName, dataName + '.wav'))
audioSet[dataName] = audio
return audioSet
def overlap(dataName, audio, audioSet):
noiseName = random.sample(set(list(audioSet.keys())) - {dataName}, 1)[0]
noiseAudio = audioSet[noiseName]
snr = [random.uniform(-5, 5)]
if len(noiseAudio) < len(audio):
shortage = len(audio) - len(noiseAudio)
noiseAudio = numpy.pad(noiseAudio, (0, shortage), 'wrap')
else:
noiseAudio = noiseAudio[:len(audio)]
noiseDB = 10 * numpy.log10(numpy.mean(abs(noiseAudio ** 2)) + 1e-4)
cleanDB = 10 * numpy.log10(numpy.mean(abs(audio ** 2)) + 1e-4)
noiseAudio = numpy.sqrt(10 ** ((cleanDB - noiseDB - snr) / 10)) * noiseAudio
audio = audio + noiseAudio
return audio.astype(numpy.int16)
def load_audio(data, dataPath, numFrames, audioAug, audioSet = None):
dataName = data[0]
fps = float(data[2])
audio = audioSet[dataName]
if audioAug == True:
augType = random.randint(0,1)
if augType == 1:
audio = overlap(dataName, audio, audioSet)
else:
audio = audio
# fps is not always 25, in order to align the visual, we modify the window and step in MFCC extraction process based on fps
audio = python_speech_features.mfcc(audio, 16000, numcep = 13, winlen = 0.025 * 25 / fps, winstep = 0.010 * 25 / fps)
maxAudio = int(numFrames * 4)
if audio.shape[0] < maxAudio:
shortage = maxAudio - audio.shape[0]
audio = numpy.pad(audio, ((0, shortage), (0,0)), 'wrap')
audio = audio[:int(round(numFrames * 4)),:]
return audio
def load_visual(data, dataPath, numFrames, visualAug):
dataName = data[0]
videoName = data[0][:11]
faceFolderPath = os.path.join(dataPath, videoName, dataName)
faceFiles = glob.glob("%s/*.jpg"%faceFolderPath)
sortedFaceFiles = sorted(faceFiles, key=lambda data: (float(data.split('/')[-1][:-4])), reverse=False)
faces = []
H = 112
if visualAug == True:
new = int(H*random.uniform(0.7, 1))
x, y = numpy.random.randint(0, H - new), numpy.random.randint(0, H - new)
M = cv2.getRotationMatrix2D((H/2,H/2), random.uniform(-15, 15), 1)
augType = random.choice(['orig', 'flip', 'crop', 'rotate'])
else:
augType = 'orig'
for faceFile in sortedFaceFiles[:numFrames]:
face = cv2.imread(faceFile)
face = cv2.cvtColor(face, cv2.COLOR_BGR2GRAY)
face = cv2.resize(face, (H,H))
if augType == 'orig':
faces.append(face)
elif augType == 'flip':
faces.append(cv2.flip(face, 1))
elif augType == 'crop':
faces.append(cv2.resize(face[y:y+new, x:x+new] , (H,H)))
elif augType == 'rotate':
faces.append(cv2.warpAffine(face, M, (H,H)))
faces = numpy.array(faces)
return faces
def load_label(data, numFrames):
res = []
labels = data[3].replace('[', '').replace(']', '')
labels = labels.split(',')
for label in labels:
res.append(int(label))
res = numpy.array(res[:numFrames])
return res
class train_loader(object):
def __init__(self, trialFileName, audioPath, visualPath, batchSize, **kwargs):
self.audioPath = audioPath
self.visualPath = visualPath
self.miniBatch = []
mixLst = open(trialFileName).read().splitlines()
# sort the training set by the length of the videos, shuffle them to make more videos in the same batch belong to different movies
sortedMixLst = sorted(mixLst, key=lambda data: (int(data.split('\t')[1]), int(data.split('\t')[-1])), reverse=True)
start = 0
while True:
length = int(sortedMixLst[start].split('\t')[1])
end = min(len(sortedMixLst), start + max(int(batchSize / length), 1))
self.miniBatch.append(sortedMixLst[start:end])
if end == len(sortedMixLst):
break
start = end
def __getitem__(self, index):
batchList = self.miniBatch[index]
numFrames = int(batchList[-1].split('\t')[1])
audioFeatures, visualFeatures, labels = [], [], []
audioSet = generate_audio_set(self.audioPath, batchList) # load the audios in this batch to do augmentation
for line in batchList:
data = line.split('\t')
audioFeatures.append(load_audio(data, self.audioPath, numFrames, audioAug = True, audioSet = audioSet))
visualFeatures.append(load_visual(data, self.visualPath,numFrames, visualAug = True))
labels.append(load_label(data, numFrames))
return torch.FloatTensor(numpy.array(audioFeatures)), \
torch.FloatTensor(numpy.array(visualFeatures)), \
torch.LongTensor(numpy.array(labels))
def __len__(self):
return len(self.miniBatch)
class val_loader(object):
def __init__(self, trialFileName, audioPath, visualPath, **kwargs):
self.audioPath = audioPath
self.visualPath = visualPath
self.miniBatch = open(trialFileName).read().splitlines()
def __getitem__(self, index):
line = [self.miniBatch[index]]
numFrames = int(line[0].split('\t')[1])
audioSet = generate_audio_set(self.audioPath, line)
data = line[0].split('\t')
audioFeatures = [load_audio(data, self.audioPath, numFrames, audioAug = False, audioSet = audioSet)]
visualFeatures = [load_visual(data, self.visualPath,numFrames, visualAug = False)]
labels = [load_label(data, numFrames)]
return torch.FloatTensor(numpy.array(audioFeatures)), \
torch.FloatTensor(numpy.array(visualFeatures)), \
torch.LongTensor(numpy.array(labels))
def __len__(self):
return len(self.miniBatch)