File size: 6,157 Bytes
4baa40f
 
 
 
 
2021a20
2bd85f5
 
ac15166
2bd85f5
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
8e3f29e
3a1ba93
 
 
 
6ea1d37
 
1fe441b
 
3a1ba93
 
 
 
 
 
6ea1d37
3a1ba93
c5b2ff8
3a1ba93
c5b2ff8
21f3853
 
3a1ba93
6ea1d37
21f3853
 
 
 
 
 
 
 
3a1ba93
 
 
21f3853
 
 
 
 
2021a20
 
 
 
ac15166
 
2021a20
2bd85f5
324080c
20ee97d
 
 
 
 
2bd85f5
90de1d1
 
 
2bd85f5
90de1d1
2bd85f5
90de1d1
2bd85f5
 
 
 
324080c
2bd85f5
a68ae9c
90de1d1
2bd85f5
324080c
 
 
 
2bd85f5
 
 
 
a68ae9c
2bd85f5
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
# SPDX-FileContributor: Karl El Hajal

import numpy as np
import webrtcvad
from pydub import AudioSegment
import subprocess
import soundfile as sf
import os
from pathlib import Path


VAD_SR = 16000
VAD_MODE = 3  # Aggressiveness level (0-3, where 3 is the most aggressive)
VAD_FRAME_DURATION = 10  # Frame duration in milliseconds

def get_speech_segments_webrtcvad(audio_array, sample_rate, frame_duration, vad_mode):
    vad = webrtcvad.Vad(vad_mode)

    # Convert the frame duration to samples
    frame_duration_samples = int(sample_rate * frame_duration / 1000)

    # Detect speech regions using VAD
    speech_segments = []
    start = -1
    for i in range(0, len(audio_array), frame_duration_samples):
        frame = audio_array[i : i + frame_duration_samples]

        if len(frame) < 160:
            is_speech = False
        else:
            frame = frame.tobytes()
            is_speech = vad.is_speech(frame, sample_rate)

        if is_speech and start == -1:
            start = i
        elif not is_speech and start != -1:
            end = i
            speech_segments.append((start, end))
            start = -1

    return speech_segments


def get_start_end_using_vad(audio, sample_rate):
    audio_array = np.array(audio.get_array_of_samples())

    speech_segments = get_speech_segments_webrtcvad(audio_array, sample_rate, VAD_FRAME_DURATION, VAD_MODE)
    if len(speech_segments) == 0:
        speech_segments = get_speech_segments_webrtcvad(audio_array, sample_rate, VAD_FRAME_DURATION, VAD_MODE - 1)

    start_sample = speech_segments[0][0]
    end_sample = speech_segments[-1][1]

    start_time = float(start_sample / VAD_SR)
    end_time = float(end_sample / VAD_SR)

    return start_time, end_time


def trim_silences(audio, target_sr):
    audio_copy = audio[:]

    audio_copy = audio_copy.set_frame_rate(VAD_SR)

    start_time, end_time = get_start_end_using_vad(audio_copy, VAD_SR)

    start_sample_orig_sr = int(start_time * target_sr)
    end_sample_orig_sr = int(end_time * target_sr)

    filtered_audio_array = np.array(audio.get_array_of_samples())
    filtered_audio_array = filtered_audio_array[start_sample_orig_sr:end_sample_orig_sr]

    filtered_audio = AudioSegment(
        filtered_audio_array.tobytes(),
        frame_rate=target_sr,
        sample_width=audio.sample_width,
        channels=audio.channels,
    )

    return filtered_audio


def match_target_amplitude(audio, target_dBFS):
    change_in_dBFS = target_dBFS - audio.dBFS
    return audio.apply_gain(change_in_dBFS)


def process_wav(wav_path, target_sr, do_trim_silences=True):
    audio = AudioSegment.from_file(wav_path)

    # Convert audio to mono
    if audio.channels > 1:
        audio = audio.set_channels(1)

    # Resample audio
    audio = audio.set_frame_rate(target_sr)

    # Convert the audio to 16-bit PCM format
    audio = audio.set_sample_width(2)

    # Remove silences
    if do_trim_silences:
        audio = trim_silences(audio, target_sr)

    # Loudness normalization to -20dB
    audio = match_target_amplitude(audio, -20.0)

    return audio


def get_red_green_segments(dist_matrix, path, wav_type='ref', threshold=0.4):
    if wav_type == "ref":
        num_wav_frames = len(dist_matrix)
    else:
        num_wav_frames = len(dist_matrix[0])
    wav_distances = [0] * num_wav_frames
    for (i, j) in zip(*path):
        if i == num_wav_frames - 2 and wav_distances[i] > 0: # Special case for second to last frame
            continue
        wav_distances[i] = dist_matrix[i, j]

    red_segments = [i for i, d in enumerate(wav_distances) if d >= threshold]
    green_segments = [i for i, d in enumerate(wav_distances) if d < threshold]

    return red_segments, green_segments, wav_distances


def assess_pronunciation_quality(dist_matrix, path, threshold=0.4, wav_type="ref"):
    # _ is green_segments
    red_segments, _, wav_distances = get_red_green_segments(dist_matrix, path, wav_type=wav_type, threshold=threshold)
    
    # Analyze normalized distances
    num_red_segments = len(red_segments)
    total_segments = len(wav_distances)
    red_percentage = num_red_segments / total_segments if total_segments > 0 else 0.0
    
    # Calculate quality score and repetition need
    quality_score = 1 - red_percentage
    needs_repeat = red_percentage > 0.5
    
    # Print debug information
    print(f"Raw distance stats:")
    print(f"  Min distance: {min(wav_distances):.4f}")
    print(f"  Max distance: {max(wav_distances):.4f}")
    print(f"  Mean distance: {np.mean(wav_distances):.4f}")
    print(f"\nNormalized distance stats:")
    print(f"  Number of red segments (>= 0.5): {num_red_segments}")
    print(f"  Total segments: {total_segments}")
    print(f"\nRed percentage: {red_percentage * 100:.2f}%")
    
    return quality_score, needs_repeat
    

def denoise_audio(input_audio_path):
    assert isinstance(input_audio_path, (str, Path)), "Input path must be a string or a Path object"
    input_audio_path = str(input_audio_path)
    output_audio_path = input_audio_path.replace(".wav", "_denoised.wav")
    
    try:
        # Load audio and convert to required format
        audio = AudioSegment.from_wav(input_audio_path)
        audio = audio.set_frame_rate(48000)  # Set to 48 kHz
        audio = audio.set_channels(1)        # Convert to mono
        audio = audio.set_sample_width(2)    # Set to 16-bit
        
        # Export as WAV with correct format
        temp_wav = "temp_audio.wav"
        audio.export(temp_wav, format="wav")
        
        # Run denoising
        result = subprocess.run(
            ["denoise", temp_wav, output_audio_path, "--plot"],
            check=True,
            capture_output=True,
            text=True
        )
        print(result.stdout)
        
        # Clean up
        os.remove(temp_wav)
        
    except subprocess.CalledProcessError as e:
        print(f"Error: {e}")
        print(f"Stdout: {e.stdout}")
        print(f"Stderr: {e.stderr}")
        return input_audio_path
    except Exception as e:
        print(f"Unexpected error: {e}")
        return input_audio_path
    
    return output_audio_path