File size: 6,395 Bytes
5c69097 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 | import os, torch, numpy, cv2, random, glob, python_speech_features
from scipy.io import wavfile
from torchvision.transforms import RandomCrop
def generate_audio_set(dataPath, batchList):
audioSet = {}
for line in batchList:
data = line.split('\t')
videoName = data[0][:11]
dataName = data[0]
_, audio = wavfile.read(os.path.join(dataPath, videoName, dataName + '.wav'))
audioSet[dataName] = audio
return audioSet
def overlap(dataName, audio, audioSet):
noiseName = random.sample(set(list(audioSet.keys())) - {dataName}, 1)[0]
noiseAudio = audioSet[noiseName]
snr = [random.uniform(-5, 5)]
if len(noiseAudio) < len(audio):
shortage = len(audio) - len(noiseAudio)
noiseAudio = numpy.pad(noiseAudio, (0, shortage), 'wrap')
else:
noiseAudio = noiseAudio[:len(audio)]
noiseDB = 10 * numpy.log10(numpy.mean(abs(noiseAudio ** 2)) + 1e-4)
cleanDB = 10 * numpy.log10(numpy.mean(abs(audio ** 2)) + 1e-4)
noiseAudio = numpy.sqrt(10 ** ((cleanDB - noiseDB - snr) / 10)) * noiseAudio
audio = audio + noiseAudio
return audio.astype(numpy.int16)
def load_audio(data, dataPath, numFrames, audioAug, audioSet = None):
dataName = data[0]
fps = float(data[2])
audio = audioSet[dataName]
if audioAug == True:
augType = random.randint(0,1)
if augType == 1:
audio = overlap(dataName, audio, audioSet)
else:
audio = audio
# fps is not always 25, in order to align the visual, we modify the window and step in MFCC extraction process based on fps
audio = python_speech_features.mfcc(audio, 16000, numcep = 13, winlen = 0.025 * 25 / fps, winstep = 0.010 * 25 / fps)
maxAudio = int(numFrames * 4)
if audio.shape[0] < maxAudio:
shortage = maxAudio - audio.shape[0]
audio = numpy.pad(audio, ((0, shortage), (0,0)), 'wrap')
audio = audio[:int(round(numFrames * 4)),:]
return audio
def load_visual(data, dataPath, numFrames, visualAug):
dataName = data[0]
videoName = data[0][:11]
faceFolderPath = os.path.join(dataPath, videoName, dataName)
faceFiles = glob.glob("%s/*.jpg"%faceFolderPath)
sortedFaceFiles = sorted(faceFiles, key=lambda data: (float(data.split('/')[-1][:-4])), reverse=False)
faces = []
H = 112
if visualAug == True:
new = int(H*random.uniform(0.7, 1))
x, y = numpy.random.randint(0, H - new), numpy.random.randint(0, H - new)
M = cv2.getRotationMatrix2D((H/2,H/2), random.uniform(-15, 15), 1)
augType = random.choice(['orig', 'flip', 'crop', 'rotate'])
else:
augType = 'orig'
for faceFile in sortedFaceFiles[:numFrames]:
face = cv2.imread(faceFile)
face = cv2.cvtColor(face, cv2.COLOR_BGR2GRAY)
face = cv2.resize(face, (H,H))
if augType == 'orig':
faces.append(face)
elif augType == 'flip':
faces.append(cv2.flip(face, 1))
elif augType == 'crop':
faces.append(cv2.resize(face[y:y+new, x:x+new] , (H,H)))
elif augType == 'rotate':
faces.append(cv2.warpAffine(face, M, (H,H)))
faces = numpy.array(faces)
return faces
def load_label(data, numFrames):
res = []
labels = data[3].replace('[', '').replace(']', '')
labels = labels.split(',')
for label in labels:
res.append(int(label))
res = numpy.array(res[:numFrames])
return res
class train_loader(object):
def __init__(self, trialFileName, audioPath, visualPath, batchSize, **kwargs):
self.audioPath = audioPath
self.visualPath = visualPath
self.miniBatch = []
mixLst = open(trialFileName).read().splitlines()
# sort the training set by the length of the videos, shuffle them to make more videos in the same batch belong to different movies
sortedMixLst = sorted(mixLst, key=lambda data: (int(data.split('\t')[1]), int(data.split('\t')[-1])), reverse=True)
start = 0
while True:
length = int(sortedMixLst[start].split('\t')[1])
end = min(len(sortedMixLst), start + max(int(batchSize / length), 1))
self.miniBatch.append(sortedMixLst[start:end])
if end == len(sortedMixLst):
break
start = end
def __getitem__(self, index):
batchList = self.miniBatch[index]
numFrames = int(batchList[-1].split('\t')[1])
audioFeatures, visualFeatures, labels = [], [], []
audioSet = generate_audio_set(self.audioPath, batchList) # load the audios in this batch to do augmentation
for line in batchList:
data = line.split('\t')
audioFeatures.append(load_audio(data, self.audioPath, numFrames, audioAug = True, audioSet = audioSet))
visualFeatures.append(load_visual(data, self.visualPath,numFrames, visualAug = True))
labels.append(load_label(data, numFrames))
return torch.FloatTensor(numpy.array(audioFeatures)), \
torch.FloatTensor(numpy.array(visualFeatures)), \
torch.LongTensor(numpy.array(labels))
def __len__(self):
return len(self.miniBatch)
class val_loader(object):
def __init__(self, trialFileName, audioPath, visualPath, **kwargs):
self.audioPath = audioPath
self.visualPath = visualPath
self.miniBatch = open(trialFileName).read().splitlines()
def __getitem__(self, index):
line = [self.miniBatch[index]]
numFrames = int(line[0].split('\t')[1])
audioSet = generate_audio_set(self.audioPath, line)
data = line[0].split('\t')
audioFeatures = [load_audio(data, self.audioPath, numFrames, audioAug = False, audioSet = audioSet)]
visualFeatures = [load_visual(data, self.visualPath,numFrames, visualAug = False)]
labels = [load_label(data, numFrames)]
return torch.FloatTensor(numpy.array(audioFeatures)), \
torch.FloatTensor(numpy.array(visualFeatures)), \
torch.LongTensor(numpy.array(labels))
def __len__(self):
return len(self.miniBatch)
|