csun22's picture
Upload 59 files
ca1888b verified
#!/usr/bin/env python
"""
data_io
Interface to process waveforms.
Note that functions here are based on numpy, and they are intended to be used
before data are converted into torch tensors.
data on disk -> DataSet.__getitem__() -----> Collate ----> Pytorch model
numpy.tensor torch.tensor
These functions don't work on pytorch tensors
"""
from __future__ import absolute_import
import os
import sys
import numpy as np
import scipy.io.wavfile
import soundfile
import core_scripts.data_io.io_tools as nii_io_tk
__author__ = "Xin Wang"
__email__ = "wangxin@nii.ac.jp"
__copyright__ = "Copyright 2021, Xin Wang"
def wavformRaw2MuLaw(wavdata, bit=16, signed=True, quanLevel = 256.0):
"""
wavConverted = wavformRaw2MuLaw(wavdata, bit=16, signed=True, \
quanLevel = 256.0)
Assume wavData is int type:
step1. convert int wav -> float wav
step2. convert linear scale wav -> mu-law wav
Args:
wavdata: np array of int-16 or int-32 waveform
bit: number of bits to encode waveform
signed: input is signed or not
quanLevel: level of quantization (default 2 ^ 8)
Returned:
wav: integer stored as float numbers
"""
if wavdata.dtype != np.int16 and wavdata.dtype != np.int32:
print("Input waveform data in not int16 or int32")
sys.exit(1)
# convert to float numbers
if signed==True:
wavdata = np.array(wavdata, dtype=np.float32) / \
np.power(2.0, bit-1)
else:
wavdata = np.array(wavdata, dtype=np.float32) / \
np.power(2.0, bit)
tmp_quan_level = quanLevel - 1
# mu-law compansion
wavtrans = np.sign(wavdata) * \
np.log(1.0 + tmp_quan_level * np.abs(wavdata)) / \
np.log(1.0 + tmp_quan_level)
wavtrans = np.round((wavtrans + 1.0) * tmp_quan_level / 2.0)
return wavtrans
def wavformMuLaw2Raw(wavdata, quanLevel = 256.0):
"""
waveformMuLaw2Raw(wavdata, quanLevel = 256.0)
Convert Mu-law waveform back to raw waveform
Args:
wavdata: np array
quanLevel: level of quantization (default: 2 ^ 8)
Return:
raw waveform: np array, float
"""
tmp_quan_level = quanLevel - 1
wavdata = wavdata * 2.0 / tmp_quan_level - 1.0
wavdata = np.sign(wavdata) * (1.0/ tmp_quan_level) * \
(np.power(quanLevel, np.abs(wavdata)) - 1.0)
return wavdata
def float2wav(rawData, wavFile, bit=16, samplingRate = 16000):
"""
float2wav(rawFile, wavFile, bit=16, samplingRate = 16000)
Convert float waveform into waveform in int
This is identitcal to waveFloatToPCMFile
To be removed
Args:
rawdata: float waveform data in np-arrary
wavFile: output file path
bit: number of bits to encode waveform in output *.wav
samplingrate:
"""
rawData = rawData * np.power(2.0, bit-1)
rawData[rawData >= np.power(2.0, bit-1)] = np.power(2.0, bit-1)-1
rawData[rawData < -1*np.power(2.0, bit-1)] = -1*np.power(2.0, bit-1)
# write as signed 16bit PCM
if bit == 16:
rawData = np.asarray(rawData, dtype=np.int16)
elif bit == 32:
rawData = np.asarray(rawData, dtype=np.int32)
else:
print("Only be able to save wav in int16 and int32 type")
print("Save to int16")
rawData = np.asarray(rawData, dtype=np.int16)
scipy.io.wavfile.write(wavFile, samplingRate, rawData)
return
def waveReadAsFloat(wavFileIn):
""" sr, wavData = wavReadToFloat(wavFileIn)
Wrapper over scipy.io.wavfile
Return:
sr: sampling_rate
wavData: waveform in np.float32 (-1, 1)
"""
sr, wavdata = scipy.io.wavfile.read(wavFileIn)
if wavdata.dtype is np.dtype(np.int16):
wavdata = np.array(wavdata, dtype=np.float32) / \
np.power(2.0, 16-1)
elif wavdata.dtype is np.dtype(np.int32):
wavdata = np.array(wavdata, dtype=np.float32) / \
np.power(2.0, 32-1)
elif wavdata.dtype is np.dtype(np.float32):
pass
else:
print("Unknown waveform format %s" % (wavFileIn))
sys.exit(1)
return sr, wavdata
def waveFloatToPCMFile(waveData, wavFile, bit=16, sr=16000):
"""waveSaveFromFloat(waveData, wavFile, bit=16, sr=16000)
Save waveData (np.float32) as PCM *.wav
Args:
waveData: waveform data as np.float32
wavFile: output PCM waveform file
bit: PCM bits
sr: sampling rate
"""
# recover to 16bit range [-32768, +32767]
rawData = waveData * np.power(2.0, bit-1)
rawData[rawData >= np.power(2.0, bit-1)] = np.power(2.0, bit-1)-1
rawData[rawData < -1*np.power(2.0, bit-1)] = -1*np.power(2.0, bit-1)
# write as signed 16bit PCM
if bit == 16:
rawData = np.asarray(rawData, dtype=np.int16)
elif bit == 32:
rawData = np.asarray(rawData, dtype=np.int32)
else:
print("Only be able to save wav in int16 and int32 type")
print("Save to int16")
rawData = np.asarray(rawData, dtype=np.int16)
scipy.io.wavfile.write(wavFile, sr, rawData)
return
def flacReadAsFloat(wavFileIn):
""" sr, wavData = flacReadAsFloat(wavFileIn)
Wrapper over soundfile.read
Return:
sr: sampling_rate
wavData: waveform in np.float32 (-1, 1)
"""
x, sr = soundfile.read(wavFileIn)
return sr, x
def buffering(x, n, p=0, opt=None):
"""buffering(x, n, p=0, opt=None)
input
-----
x: np.array, input signal, (length, )
n: int, window length
p: int, overlap, not frame shift
outpupt
-------
output: np.array, framed buffer, (frame_num, frame_length)
Example
-------
framed = buffer(wav, 320, 80, 'nodelay')
Code from https://stackoverflow.com/questions/38453249/
"""
if opt not in ('nodelay', None):
raise ValueError('{} not implemented'.format(opt))
i = 0
if opt == 'nodelay':
# No zeros at array start
result = x[:n]
i = n
else:
# Start with `p` zeros
result = np.hstack([np.zeros(p), x[:n-p]])
i = n-p
# Make 2D array, cast to list for .append()
result = list(np.expand_dims(result, axis=0))
while i < len(x):
# Create next column, add `p` results from last col if given
col = x[i:i+(n-p)]
if p != 0:
col = np.hstack([result[-1][-p:], col])
# Append zeros if last row and not length `n`
if len(col):
col = np.hstack([col, np.zeros(n - len(col))])
# Combine result with next row
result.append(np.array(col))
i += (n - p)
return np.vstack(result).astype(x.dtype)
def windowing(framed_buffer, window_type='hanning'):
"""windowing(framed_buffer, window_type='hanning')
input
-----
framed_buffer: np.array, (frame_num, frame_length), output of buffering
window_type: str, default 'hanning'
"""
if window_type == 'hanning':
window = np.hanning(framed_buffer.shape[1])
else:
assert False, "Unknown window type in windowing"
return framed_buffer * window.astype(framed_buffer.dtype)
def silence_handler(wav, sr, fl=320, fs=80,
max_thres_below=30,
min_thres=-55,
shortest_len_in_ms=50,
flag_output=0):
"""silence_handler(wav, sr, fs, fl)
input
-----
wav: np.array, (wav_length, ), wavform data
sr: int, sampling rate
fl: int, frame length, default 320
fs: int, frame shift, in number of waveform poings, default 80
flag_output: int, flag to select output
0: return wav_no_sil, sil_wav, time_tag
1: return wav_no_sil
2: return sil_wav
max_thres_below: int, default 30, max_enenergy - max_thres_below
is the lower threshold for speech frame
min_thres: int, default -55, the lower threshold for speech frame
shortest_len_in_ms: int, ms, default 50 ms,
segment less than this length is treated as speech
output
------
wav_no_sil: np.array, (length_1, ), waveform after removing silence
sil_wav: np.array, (length_2, ), waveform in silence regions
time_tag: [[start, end], []], where
Note: output depends on flag_output
"""
assert fs < fl, "Frame shift should be smaller than frame length"
frames = buffering(wav, fl, fl - fs, 'nodelay')
windowed_frames = windowing(frames)
frame_energy = 20*np.log10(np.std(frames, axis=1)+np.finfo(np.float32).eps)
frame_energy_max = np.max(frame_energy)
frame_tag = np.bitwise_and(
(frame_energy > (frame_energy_max - max_thres_below)),
frame_energy > min_thres)
frame_tag = np.asarray(frame_tag, dtype=np.int)
seg_len_thres = shortest_len_in_ms * sr / 1000 / fs
def ignore_short_seg(frame_tag, seg_len_thres):
frame_tag_new = np.zeros_like(frame_tag) + frame_tag
# boundary of each segment
seg_bound = np.diff(np.concatenate(([0], frame_tag, [0])))
# start of each segment
seg_start = np.argwhere(seg_bound == 1)[:, 0]
# end of each segment
seg_end = np.argwhere(seg_bound == -1)[:, 0]
assert seg_start.shape[0] == seg_end.shape[0], \
"Fail to extract segment boundaries"
# length of segment
seg_len = seg_end - seg_start
seg_short_ids = np.argwhere(seg_len < seg_len_thres)[:, 0]
for idx in seg_short_ids:
start_frame_idx = seg_start[idx]
end_frame_idx = seg_end[idx]
frame_tag_new[start_frame_idx:end_frame_idx] = 0
return frame_tag_new
# work on non-speech, 1-frame_tag indicates non-speech frames
frame_process_sil = ignore_short_seg(1-frame_tag, seg_len_thres)
# reverse the sign
frame_process_sil = 1 - frame_process_sil
# work on speech
frame_process_all = ignore_short_seg(frame_process_sil, seg_len_thres)
# separate non-speech and speech segments
# do overlap and add
frame_tag = frame_process_all
# buffer for speech segments
spe_buf = np.zeros([np.sum(frame_tag) * fs + fl], dtype=wav.dtype)
# buffer for non-speech segments
sil_buf = np.zeros([np.sum(1-frame_tag) * fs + fl], dtype=wav.dtype)
spe_fr_pt = 0
non_fr_pt = 0
for frame_idx, flag_speech in enumerate(frame_tag):
if flag_speech:
spe_buf[spe_fr_pt*fs:spe_fr_pt*fs+fl] += windowed_frames[frame_idx]
spe_fr_pt += 1
else:
sil_buf[non_fr_pt*fs:non_fr_pt*fs+fl] += windowed_frames[frame_idx]
non_fr_pt += 1
if flag_output == 1:
return spe_buf
elif flag_output == 2:
return sil_buf
else:
return spe_buf, sil_buf, frame_tag
if __name__ == "__main__":
print("Definition of tools for wav")