Update prosody_preprocessor.py
Browse files- prosody_preprocessor.py +41 -4
prosody_preprocessor.py
CHANGED
|
@@ -9,6 +9,8 @@ import dataclasses
|
|
| 9 |
import parselmouth
|
| 10 |
from transformers import PreTrainedModel,PretrainedConfig, FeatureExtractionMixin
|
| 11 |
from datasets import Dataset
|
|
|
|
|
|
|
| 12 |
|
| 13 |
@dataclass
|
| 14 |
class SpeakerStats:
|
|
@@ -75,6 +77,10 @@ class ProsodyPreprocessor(FeatureExtractionMixin):
|
|
| 75 |
|
| 76 |
def extract_features(self, audio):
|
| 77 |
"""Extract F0 and intensity features"""
|
|
|
|
|
|
|
|
|
|
|
|
|
| 78 |
|
| 79 |
audio = torch.Tensor(audio)
|
| 80 |
|
|
@@ -114,7 +120,6 @@ class ProsodyPreprocessor(FeatureExtractionMixin):
|
|
| 114 |
def collect_stats(self, dataset: Dataset, num_proc: int = 4, batch_size: int = 32) -> Dict[str, SpeakerStats]:
|
| 115 |
"""First pass: collect speaker statistics using dataset.map"""
|
| 116 |
|
| 117 |
-
# Step 1: Extract features using map
|
| 118 |
def extract_features_batch(examples):
|
| 119 |
features_list = []
|
| 120 |
for audio in examples['audio']:
|
|
@@ -127,7 +132,6 @@ class ProsodyPreprocessor(FeatureExtractionMixin):
|
|
| 127 |
'speaker_id': examples['speaker_id']
|
| 128 |
}
|
| 129 |
|
| 130 |
-
# Extract features for all samples
|
| 131 |
features_dataset = dataset.map(
|
| 132 |
extract_features_batch,
|
| 133 |
batched=True,
|
|
@@ -138,7 +142,6 @@ class ProsodyPreprocessor(FeatureExtractionMixin):
|
|
| 138 |
)
|
| 139 |
|
| 140 |
|
| 141 |
-
# Step 2: Group features by speaker
|
| 142 |
speaker_features = {}
|
| 143 |
for item in features_dataset:
|
| 144 |
|
|
@@ -149,7 +152,6 @@ class ProsodyPreprocessor(FeatureExtractionMixin):
|
|
| 149 |
speaker_features[speaker_id]['f0'].append(item['f0'])
|
| 150 |
speaker_features[speaker_id]['intensity'].append(item['intensity'])
|
| 151 |
|
| 152 |
-
# Step 3: Calculate stats per speaker
|
| 153 |
self.speaker_stats = {
|
| 154 |
spk: SpeakerStats.from_features(
|
| 155 |
feats['f0'],
|
|
@@ -217,3 +219,38 @@ class ProsodyPreprocessor(FeatureExtractionMixin):
|
|
| 217 |
# # spk: SpeakerStats(**stats)
|
| 218 |
# # for spk, stats in state_dict.items()
|
| 219 |
# # }
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 9 |
import parselmouth
|
| 10 |
from transformers import PreTrainedModel,PretrainedConfig, FeatureExtractionMixin
|
| 11 |
from datasets import Dataset
|
| 12 |
+
from scipy.signal import medfilt
|
| 13 |
+
import scipy.interpolate as scipy_interp
|
| 14 |
|
| 15 |
@dataclass
|
| 16 |
class SpeakerStats:
|
|
|
|
| 77 |
|
| 78 |
def extract_features(self, audio):
|
| 79 |
"""Extract F0 and intensity features"""
|
| 80 |
+
|
| 81 |
+
|
| 82 |
+
# Override the original method to fix a bug
|
| 83 |
+
pYAAPT.PitchObj.interpolate = interpolate
|
| 84 |
|
| 85 |
audio = torch.Tensor(audio)
|
| 86 |
|
|
|
|
| 120 |
def collect_stats(self, dataset: Dataset, num_proc: int = 4, batch_size: int = 32) -> Dict[str, SpeakerStats]:
|
| 121 |
"""First pass: collect speaker statistics using dataset.map"""
|
| 122 |
|
|
|
|
| 123 |
def extract_features_batch(examples):
|
| 124 |
features_list = []
|
| 125 |
for audio in examples['audio']:
|
|
|
|
| 132 |
'speaker_id': examples['speaker_id']
|
| 133 |
}
|
| 134 |
|
|
|
|
| 135 |
features_dataset = dataset.map(
|
| 136 |
extract_features_batch,
|
| 137 |
batched=True,
|
|
|
|
| 142 |
)
|
| 143 |
|
| 144 |
|
|
|
|
| 145 |
speaker_features = {}
|
| 146 |
for item in features_dataset:
|
| 147 |
|
|
|
|
| 152 |
speaker_features[speaker_id]['f0'].append(item['f0'])
|
| 153 |
speaker_features[speaker_id]['intensity'].append(item['intensity'])
|
| 154 |
|
|
|
|
| 155 |
self.speaker_stats = {
|
| 156 |
spk: SpeakerStats.from_features(
|
| 157 |
feats['f0'],
|
|
|
|
| 219 |
# # spk: SpeakerStats(**stats)
|
| 220 |
# # for spk, stats in state_dict.items()
|
| 221 |
# # }
|
| 222 |
+
|
| 223 |
+
|
| 224 |
+
def interpolate(self):
|
| 225 |
+
pitch = np.zeros((self.nframes))
|
| 226 |
+
pitch[:] = self.samp_values
|
| 227 |
+
pitch2 = medfilt(self.samp_values, self.SMOOTH_FACTOR)
|
| 228 |
+
|
| 229 |
+
# This part in the original code is kind of confused and caused
|
| 230 |
+
# some problems with the extrapolated points before the first
|
| 231 |
+
# voiced frame and after the last voiced frame. So, I made some
|
| 232 |
+
# small modifications in order to make it work better.
|
| 233 |
+
edges = self.edges_finder(pitch)
|
| 234 |
+
first_sample = pitch[0]
|
| 235 |
+
last_sample = pitch[-1]
|
| 236 |
+
|
| 237 |
+
if len(np.nonzero(pitch2)[0]) < 2:
|
| 238 |
+
pitch[pitch == 0] = self.PTCH_TYP
|
| 239 |
+
else:
|
| 240 |
+
nz_pitch = pitch2[pitch2 > 0]
|
| 241 |
+
pitch2 = scipy_interp.pchip(np.nonzero(pitch2)[0],
|
| 242 |
+
nz_pitch)(range(self.nframes))
|
| 243 |
+
pitch[pitch == 0] = pitch2[pitch == 0]
|
| 244 |
+
if self.SMOOTH > 0:
|
| 245 |
+
pitch = medfilt(pitch, self.SMOOTH_FACTOR)
|
| 246 |
+
try:
|
| 247 |
+
if first_sample == 0:
|
| 248 |
+
# This is statement fixes the bug that caused the whole f0 to be flattened
|
| 249 |
+
if edges[0] == 0:
|
| 250 |
+
edges[0] = 1
|
| 251 |
+
pitch[:edges[0]-1] = pitch[edges[0]]
|
| 252 |
+
if last_sample == 0:
|
| 253 |
+
pitch[edges[-1]+1:] = pitch[edges[-1]]
|
| 254 |
+
except:
|
| 255 |
+
pass
|
| 256 |
+
self.samp_interp = pitch
|