File size: 4,903 Bytes
4baa40f
 
 
 
 
2021a20
 
4baa40f
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
21f3853
 
 
d4d0c2c
 
 
 
 
6ea1d37
 
d4d0c2c
 
 
 
 
 
6ea1d37
d4d0c2c
 
 
 
21f3853
 
d4d0c2c
6ea1d37
21f3853
 
 
 
 
 
 
 
d4d0c2c
 
 
21f3853
 
 
 
 
2021a20
 
 
 
d4d0c2c
2021a20
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
# SPDX-FileContributor: Karl El Hajal

import numpy as np
import webrtcvad
from pydub import AudioSegment
import subprocess


VAD_SR = 16000
VAD_MODE = 3  # Aggressiveness level (0-3, where 3 is the most aggressive)
VAD_FRAME_DURATION = 10  # Frame duration in milliseconds

def get_speech_segments_webrtcvad(audio_array, sample_rate, frame_duration, vad_mode):
    vad = webrtcvad.Vad(vad_mode)

    # Convert the frame duration to samples
    frame_duration_samples = int(sample_rate * frame_duration / 1000)

    # Detect speech regions using VAD
    speech_segments = []
    start = -1
    for i in range(0, len(audio_array), frame_duration_samples):
        frame = audio_array[i : i + frame_duration_samples]

        if len(frame) < 160:
            is_speech = False
        else:
            frame = frame.tobytes()
            is_speech = vad.is_speech(frame, sample_rate)

        if is_speech and start == -1:
            start = i
        elif not is_speech and start != -1:
            end = i
            speech_segments.append((start, end))
            start = -1

    return speech_segments


def get_start_end_using_vad(audio, sample_rate):
    audio_array = np.array(audio.get_array_of_samples())

    speech_segments = get_speech_segments_webrtcvad(audio_array, sample_rate, VAD_FRAME_DURATION, VAD_MODE)
    if len(speech_segments) == 0:
        speech_segments = get_speech_segments_webrtcvad(audio_array, sample_rate, VAD_FRAME_DURATION, VAD_MODE - 1)

    start_sample = speech_segments[0][0]
    end_sample = speech_segments[-1][1]

    start_time = float(start_sample / VAD_SR)
    end_time = float(end_sample / VAD_SR)

    return start_time, end_time


def trim_silences(audio, target_sr):
    audio_copy = audio[:]

    audio_copy = audio_copy.set_frame_rate(VAD_SR)

    start_time, end_time = get_start_end_using_vad(audio_copy, VAD_SR)

    start_sample_orig_sr = int(start_time * target_sr)
    end_sample_orig_sr = int(end_time * target_sr)

    filtered_audio_array = np.array(audio.get_array_of_samples())
    filtered_audio_array = filtered_audio_array[start_sample_orig_sr:end_sample_orig_sr]

    filtered_audio = AudioSegment(
        filtered_audio_array.tobytes(),
        frame_rate=target_sr,
        sample_width=audio.sample_width,
        channels=audio.channels,
    )

    return filtered_audio


def match_target_amplitude(audio, target_dBFS):
    change_in_dBFS = target_dBFS - audio.dBFS
    return audio.apply_gain(change_in_dBFS)


def process_wav(wav_path, target_sr, do_trim_silences=True):
    audio = AudioSegment.from_file(wav_path)

    # Convert audio to mono
    if audio.channels > 1:
        audio = audio.set_channels(1)

    # Resample audio
    audio = audio.set_frame_rate(target_sr)

    # Convert the audio to 16-bit PCM format
    audio = audio.set_sample_width(2)

    # Remove silences
    if do_trim_silences:
        audio = trim_silences(audio, target_sr)

    # Loudness normalization to -20dB
    audio = match_target_amplitude(audio, -20.0)

    return audio


def get_red_green_segments(dist_matrix, path, wav_type='ref', threshold=0.3):
    if wav_type == "ref":
        num_wav_frames = len(dist_matrix)
    else:
        num_wav_frames = len(dist_matrix[0])
    wav_distances = [0] * num_wav_frames
    for (i, j) in zip(*path):
        wav_distances[i] = dist_matrix[i, j]

    red_segments = [i for i, d in enumerate(wav_distances) if d >= threshold]
    green_segments = [i for i, d in enumerate(wav_distances) if d < threshold]

    return red_segments, green_segments, wav_distances


def assess_pronunciation_quality(dist_matrix, path):
    # _ is green_segments
    red_segments, _, wav_distances = get_red_green_segments(dist_matrix, path, wav_type=None)
    
    # Analyze normalized distances
    num_red_segments = len(red_segments)
    total_segments = len(wav_distances)
    red_percentage = num_red_segments / total_segments if total_segments > 0 else 0.0
    
    # Calculate quality score and repetition need
    quality_score = 1 - red_percentage
    needs_repeat = red_percentage > 0.5
    
    # Print debug information
    print(f"Raw distance stats:")
    print(f"  Min distance: {min(wav_distances):.4f}")
    print(f"  Max distance: {max(wav_distances):.4f}")
    print(f"  Mean distance: {np.mean(wav_distances):.4f}")
    print(f"\nNormalized distance stats:")
    print(f"  Number of red segments (>= 0.5): {num_red_segments}")
    print(f"  Total segments: {total_segments}")
    print(f"\nRed percentage: {red_percentage * 100:.2f}%")
    
    return quality_score, needs_repeat
    

def denoise_audio(input_audio_path):
    assert isinstance(input_audio_path, str), "Input path must be a string"
    output_audio_path = input_audio_path.replace(".wav", "_denoised.wav")
    subprocess.run(["denoise", input_audio_path, output_audio_path, "--plot"], check=True)

    return output_audio_path