library_name: transformers
tags:
- prosody
- segmentation
- audio
- speech
language:
- sl
base_model:
- facebook/w2v-bert-2.0
Wav2Vec2Bert Audio frame classifier for prosodic unit detection
This model predicts prosodic units on speech. For each 20ms frame the model predicts 1 or 0, indicating whether there is a prosodic unit in this frame or not.
This frame-level output can be grouped into events with the frames_to_intervals function provided in the code snippets below.
It is known that the model is unreliable if the audio starts or ends within a prosodic unit. This can be somewhat circumvented by 1) using the largest possible chunks that will fit your machine and 2) use overlapping chunks and combining results smartly.
Model Details
Model Description
This is the model card of a 🤗 transformers model that has been pushed on the Hub. This model card has been automatically generated.
- Developed by: Peter Rupnik, Nikola Ljubešić, Darinka Verdonik, Simona Majheničy
- Funded by: MEZZANINE project
- Model type: Wav2Vec2Bert for Audio Frame Classification
- Language(s) (NLP): Trained and tested on Slovenian, ATM unclear if usable cross-lingually
- Finetuned from model: facebook/w2v-bert-2.0
Uses
Simple use (short files)
For shorter audios that fit on your GPU the classifier can be used directly.
import numpy as np
from datasets import Audio, Dataset
from transformers import AutoFeatureExtractor, Wav2Vec2BertForAudioFrameClassification
import torch
import numpy as np
if torch.cuda.is_available():
device = torch.device("cuda")
else:
device = torch.device("cpu")
model_name = "5roop/Wav2Vec2BertProsodicUnitsFrameClassifier"
feature_extractor = AutoFeatureExtractor.from_pretrained(model_name)
model = Wav2Vec2BertForAudioFrameClassification.from_pretrained(model_name).to(device)
f = "data/Rog-Art-N-G6007-P600702_181.070_211.070.wav"
def frames_to_intervals(frames: list) -> list[tuple]:
from itertools import pairwise
import pandas as pd
results = []
ndf = pd.DataFrame(
data={
"time_s": [0.020 * i for i in range(len(frames))],
"frames": frames,
}
)
ndf = ndf.dropna()
indices_of_change = ndf.frames.diff()[ndf.frames.diff() != 0].index.values
for si, ei in pairwise(indices_of_change):
if ndf.loc[si : ei - 1, "frames"].mode()[0] == 0:
pass
else:
results.append(
(round(ndf.loc[si, "time_s"], 3), round(ndf.loc[ei - 1, "time_s"], 3))
)
return results
def evaluator(chunks):
sampling_rate = chunks["audio"][0]["sampling_rate"]
with torch.no_grad():
inputs = feature_extractor(
[i["array"] for i in chunks["audio"]],
return_tensors="pt",
sampling_rate=sampling_rate,
).to(device)
logits = model(**inputs).logits
y_pred_raw = np.array(logits.cpu())
y_pred = y_pred_raw.argmax(axis=-1)
prosodic_units = [frames_to_intervals(i) for i in y_pred]
return {
"y_pred": y_pred,
"y_pred_logits": y_pred_raw,
"prosodic_units": prosodic_units,
}
ds = Dataset.from_dict({"audio": [f, f]}).cast_column("audio", Audio(16000, mono=True))
ds = ds.map(evaluator, batched=True, batch_size=2)
print(ds["y_pred"][0])
# Outputs: [0, 0, 1, 1, 1, 1, 1, ...]
print(ds["y_pred_logits"][0])
# Outputs:
# [[ 0.89419061, -0.77746612],
# [ 0.44213724, -0.34862748],
# [-0.08605709, 0.13012762],
# ....
print(ds["prosodic_units"][0])
# Outputs: [[0.04, 2.4], [3.52, 6.6], ....
Inference on longer files
If the file is too big for straight-forward inference, some chunking needs to be performed in order to process it. We know that for starts and ends of chunks the probability of false negatives increases, so it is best to process the file with some overlap between chunks or split it on silence. We illustrate the former approach here:
import numpy as np
from datasets import Audio, Dataset
from transformers import AutoFeatureExtractor, Wav2Vec2BertForAudioFrameClassification
import torch
import numpy as np
if torch.cuda.is_available():
device = torch.device("cuda")
else:
device = torch.device("cpu")
model_name = "5roop/Wav2Vec2BertProsodicUnitsFrameClassifier"
feature_extractor = AutoFeatureExtractor.from_pretrained(model_name)
model = Wav2Vec2BertForAudioFrameClassification.from_pretrained(model_name).to(device)
f = "ROG/ROG-Art/WAV/Rog-Art-N-G5025-P600022.wav"
OVERLAP_S = 10
CHUNK_LENGTH_S = 30
SAMPLING_RATE = 16_000
OVERLAP_SAMPLES = OVERLAP_S * SAMPLING_RATE
CHUNK_LENGTH_SAMPLES = CHUNK_LENGTH_S * SAMPLING_RATE
def frames_to_intervals(frames: list) -> list[tuple]:
from itertools import pairwise
import pandas as pd
results = []
ndf = pd.DataFrame(
data={
"time_s": [0.020 * i for i in range(len(frames))],
"frames": frames,
}
)
ndf = ndf.dropna()
indices_of_change = ndf.frames.diff()[ndf.frames.diff() != 0].index.values
for si, ei in pairwise(indices_of_change):
if ndf.loc[si : ei - 1, "frames"].mode()[0] == 0:
pass
else:
results.append(
(round(ndf.loc[si, "time_s"], 3), round(ndf.loc[ei - 1, "time_s"], 3))
)
return results
def merge_events(events: list[list[float]], centroids):
flattened_events = []
flattened_centroids = []
for batch_idx, batch in enumerate(events):
for event in batch:
flattened_events.append(event)
flattened_centroids.append(centroids[batch_idx])
flattened_events.sort(key=lambda x: x[0])
# Merged list to store final intervals
merged = []
for event, centroid in zip(flattened_events, flattened_centroids):
if not merged:
# If merged is empty, simply add the first event
merged.append((event, centroid))
else:
last_event, last_centroid = merged[-1]
# Check for overlap
if (last_event[0] < event[1]) and (last_event[1] > event[0]):
# Calculate the midpoint of the intervals
last_event_midpoint = (last_event[0] + last_event[1]) / 2
current_event_midpoint = (event[0] + event[1]) / 2
# Choose the event whose centroid is closer to its midpoint
if abs(last_centroid - last_event_midpoint) <= abs(
centroid - current_event_midpoint
):
continue
else:
merged[-1] = (event, centroid)
else:
merged.append((event, centroid))
final_intervals = [event for event, _ in merged]
return final_intervals
def evaluator(chunks):
with torch.no_grad():
samples = []
for array, start, end in zip(chunks["audio"], chunks["start"], chunks["end"]):
samples.append(array["array"][start:end])
inputs = feature_extractor(
samples,
return_tensors="pt",
sampling_rate=SAMPLING_RATE,
).to(device)
logits = model(**inputs).logits
y_pred_raw = np.array(logits.cpu())
y_pred = y_pred_raw.argmax(axis=-1)
prosodic_units = [
np.array(frames_to_intervals(i)) + start / SAMPLING_RATE
for i, start in zip(y_pred, chunks["start"])
]
return {
"y_pred": y_pred,
"y_pred_logits": y_pred_raw,
"prosodic_units": prosodic_units,
}
audio_duration_samples = (
Audio(SAMPLING_RATE, mono=True)
.decode_example({"path": f, "bytes": None})["array"]
.shape[0]
)
chunk_starts = np.arange(
0, audio_duration_samples, CHUNK_LENGTH_SAMPLES - OVERLAP_SAMPLES
)
chunk_ends = chunk_starts + CHUNK_LENGTH_SAMPLES
ds = Dataset.from_dict(
{
"audio": [f for i in chunk_starts],
"start": chunk_starts,
"end": chunk_ends,
"chunk_centroid_s": (chunk_starts + chunk_ends) / 2 / SAMPLING_RATE,
}
).cast_column("audio", Audio(SAMPLING_RATE, mono=True))
ds = ds.map(evaluator, batched=True, batch_size=10)
final_intervals = merge_events(ds["prosodic_units"], ds["chunk_centroid_s"])
print(final_intervals)
# Outputs: [[3.14, 4.96], [5.6, 8.4], [8.62, 9.32], [10.12, 10.7], [11.72, 13.1],....
Training Details
| hyperparameter | value |
|---|---|
| learning rate | 3e-5 |
| batch size | 1 |
| gradient accumulation steps | 16 |
| num train epochs | 20 |
| weight decay | 0.01 |