| | import os, torch, numpy, cv2, random, glob, python_speech_features
|
| | from scipy.io import wavfile
|
| | from torchvision.transforms import RandomCrop
|
| |
|
| | def generate_audio_set(dataPath, batchList):
|
| | audioSet = {}
|
| | for line in batchList:
|
| | data = line.split('\t')
|
| | videoName = data[0][:11]
|
| | dataName = data[0]
|
| | _, audio = wavfile.read(os.path.join(dataPath, videoName, dataName + '.wav'))
|
| | audioSet[dataName] = audio
|
| | return audioSet
|
| |
|
| | def overlap(dataName, audio, audioSet):
|
| | noiseName = random.sample(set(list(audioSet.keys())) - {dataName}, 1)[0]
|
| | noiseAudio = audioSet[noiseName]
|
| | snr = [random.uniform(-5, 5)]
|
| | if len(noiseAudio) < len(audio):
|
| | shortage = len(audio) - len(noiseAudio)
|
| | noiseAudio = numpy.pad(noiseAudio, (0, shortage), 'wrap')
|
| | else:
|
| | noiseAudio = noiseAudio[:len(audio)]
|
| | noiseDB = 10 * numpy.log10(numpy.mean(abs(noiseAudio ** 2)) + 1e-4)
|
| | cleanDB = 10 * numpy.log10(numpy.mean(abs(audio ** 2)) + 1e-4)
|
| | noiseAudio = numpy.sqrt(10 ** ((cleanDB - noiseDB - snr) / 10)) * noiseAudio
|
| | audio = audio + noiseAudio
|
| | return audio.astype(numpy.int16)
|
| |
|
| | def load_audio(data, dataPath, numFrames, audioAug, audioSet = None):
|
| | dataName = data[0]
|
| | fps = float(data[2])
|
| | audio = audioSet[dataName]
|
| | if audioAug == True:
|
| | augType = random.randint(0,1)
|
| | if augType == 1:
|
| | audio = overlap(dataName, audio, audioSet)
|
| | else:
|
| | audio = audio
|
| |
|
| | audio = python_speech_features.mfcc(audio, 16000, numcep = 13, winlen = 0.025 * 25 / fps, winstep = 0.010 * 25 / fps)
|
| | maxAudio = int(numFrames * 4)
|
| | if audio.shape[0] < maxAudio:
|
| | shortage = maxAudio - audio.shape[0]
|
| | audio = numpy.pad(audio, ((0, shortage), (0,0)), 'wrap')
|
| | audio = audio[:int(round(numFrames * 4)),:]
|
| | return audio
|
| |
|
| | def load_visual(data, dataPath, numFrames, visualAug):
|
| | dataName = data[0]
|
| | videoName = data[0][:11]
|
| | faceFolderPath = os.path.join(dataPath, videoName, dataName)
|
| | faceFiles = glob.glob("%s/*.jpg"%faceFolderPath)
|
| | sortedFaceFiles = sorted(faceFiles, key=lambda data: (float(data.split('/')[-1][:-4])), reverse=False)
|
| | faces = []
|
| | H = 112
|
| | if visualAug == True:
|
| | new = int(H*random.uniform(0.7, 1))
|
| | x, y = numpy.random.randint(0, H - new), numpy.random.randint(0, H - new)
|
| | M = cv2.getRotationMatrix2D((H/2,H/2), random.uniform(-15, 15), 1)
|
| | augType = random.choice(['orig', 'flip', 'crop', 'rotate'])
|
| | else:
|
| | augType = 'orig'
|
| | for faceFile in sortedFaceFiles[:numFrames]:
|
| | face = cv2.imread(faceFile)
|
| | face = cv2.cvtColor(face, cv2.COLOR_BGR2GRAY)
|
| | face = cv2.resize(face, (H,H))
|
| | if augType == 'orig':
|
| | faces.append(face)
|
| | elif augType == 'flip':
|
| | faces.append(cv2.flip(face, 1))
|
| | elif augType == 'crop':
|
| | faces.append(cv2.resize(face[y:y+new, x:x+new] , (H,H)))
|
| | elif augType == 'rotate':
|
| | faces.append(cv2.warpAffine(face, M, (H,H)))
|
| | faces = numpy.array(faces)
|
| | return faces
|
| |
|
| |
|
| | def load_label(data, numFrames):
|
| | res = []
|
| | labels = data[3].replace('[', '').replace(']', '')
|
| | labels = labels.split(',')
|
| | for label in labels:
|
| | res.append(int(label))
|
| | res = numpy.array(res[:numFrames])
|
| | return res
|
| |
|
| | class train_loader(object):
|
| | def __init__(self, trialFileName, audioPath, visualPath, batchSize, **kwargs):
|
| | self.audioPath = audioPath
|
| | self.visualPath = visualPath
|
| | self.miniBatch = []
|
| | mixLst = open(trialFileName).read().splitlines()
|
| |
|
| | sortedMixLst = sorted(mixLst, key=lambda data: (int(data.split('\t')[1]), int(data.split('\t')[-1])), reverse=True)
|
| | start = 0
|
| | while True:
|
| | length = int(sortedMixLst[start].split('\t')[1])
|
| | end = min(len(sortedMixLst), start + max(int(batchSize / length), 1))
|
| | self.miniBatch.append(sortedMixLst[start:end])
|
| | if end == len(sortedMixLst):
|
| | break
|
| | start = end
|
| |
|
| | def __getitem__(self, index):
|
| | batchList = self.miniBatch[index]
|
| | numFrames = int(batchList[-1].split('\t')[1])
|
| | audioFeatures, visualFeatures, labels = [], [], []
|
| | audioSet = generate_audio_set(self.audioPath, batchList)
|
| | for line in batchList:
|
| | data = line.split('\t')
|
| | audioFeatures.append(load_audio(data, self.audioPath, numFrames, audioAug = True, audioSet = audioSet))
|
| | visualFeatures.append(load_visual(data, self.visualPath,numFrames, visualAug = True))
|
| | labels.append(load_label(data, numFrames))
|
| | return torch.FloatTensor(numpy.array(audioFeatures)), \
|
| | torch.FloatTensor(numpy.array(visualFeatures)), \
|
| | torch.LongTensor(numpy.array(labels))
|
| |
|
| | def __len__(self):
|
| | return len(self.miniBatch)
|
| |
|
| |
|
| | class val_loader(object):
|
| | def __init__(self, trialFileName, audioPath, visualPath, **kwargs):
|
| | self.audioPath = audioPath
|
| | self.visualPath = visualPath
|
| | self.miniBatch = open(trialFileName).read().splitlines()
|
| |
|
| | def __getitem__(self, index):
|
| | line = [self.miniBatch[index]]
|
| | numFrames = int(line[0].split('\t')[1])
|
| | audioSet = generate_audio_set(self.audioPath, line)
|
| | data = line[0].split('\t')
|
| | audioFeatures = [load_audio(data, self.audioPath, numFrames, audioAug = False, audioSet = audioSet)]
|
| | visualFeatures = [load_visual(data, self.visualPath,numFrames, visualAug = False)]
|
| | labels = [load_label(data, numFrames)]
|
| | return torch.FloatTensor(numpy.array(audioFeatures)), \
|
| | torch.FloatTensor(numpy.array(visualFeatures)), \
|
| | torch.LongTensor(numpy.array(labels))
|
| |
|
| | def __len__(self):
|
| | return len(self.miniBatch)
|
| |
|