# SPDX-FileContributor: Karl El Hajal import numpy as np import webrtcvad from pydub import AudioSegment import subprocess import soundfile as sf import os from pathlib import Path VAD_SR = 16000 VAD_MODE = 3 # Aggressiveness level (0-3, where 3 is the most aggressive) VAD_FRAME_DURATION = 10 # Frame duration in milliseconds def get_speech_segments_webrtcvad(audio_array, sample_rate, frame_duration, vad_mode): vad = webrtcvad.Vad(vad_mode) # Convert the frame duration to samples frame_duration_samples = int(sample_rate * frame_duration / 1000) # Detect speech regions using VAD speech_segments = [] start = -1 for i in range(0, len(audio_array), frame_duration_samples): frame = audio_array[i : i + frame_duration_samples] if len(frame) < 160: is_speech = False else: frame = frame.tobytes() is_speech = vad.is_speech(frame, sample_rate) if is_speech and start == -1: start = i elif not is_speech and start != -1: end = i speech_segments.append((start, end)) start = -1 return speech_segments def get_start_end_using_vad(audio, sample_rate): audio_array = np.array(audio.get_array_of_samples()) speech_segments = get_speech_segments_webrtcvad(audio_array, sample_rate, VAD_FRAME_DURATION, VAD_MODE) if len(speech_segments) == 0: speech_segments = get_speech_segments_webrtcvad(audio_array, sample_rate, VAD_FRAME_DURATION, VAD_MODE - 1) start_sample = speech_segments[0][0] end_sample = speech_segments[-1][1] start_time = float(start_sample / VAD_SR) end_time = float(end_sample / VAD_SR) return start_time, end_time def trim_silences(audio, target_sr): audio_copy = audio[:] audio_copy = audio_copy.set_frame_rate(VAD_SR) start_time, end_time = get_start_end_using_vad(audio_copy, VAD_SR) start_sample_orig_sr = int(start_time * target_sr) end_sample_orig_sr = int(end_time * target_sr) filtered_audio_array = np.array(audio.get_array_of_samples()) filtered_audio_array = filtered_audio_array[start_sample_orig_sr:end_sample_orig_sr] filtered_audio = AudioSegment( filtered_audio_array.tobytes(), frame_rate=target_sr, sample_width=audio.sample_width, channels=audio.channels, ) return filtered_audio def match_target_amplitude(audio, target_dBFS): change_in_dBFS = target_dBFS - audio.dBFS return audio.apply_gain(change_in_dBFS) def process_wav(wav_path, target_sr, do_trim_silences=True): audio = AudioSegment.from_file(wav_path) # Convert audio to mono if audio.channels > 1: audio = audio.set_channels(1) # Resample audio audio = audio.set_frame_rate(target_sr) # Convert the audio to 16-bit PCM format audio = audio.set_sample_width(2) # Remove silences if do_trim_silences: audio = trim_silences(audio, target_sr) # Loudness normalization to -20dB audio = match_target_amplitude(audio, -20.0) return audio def get_red_green_segments(dist_matrix, path, wav_type='ref', threshold=0.4): if wav_type == "ref": num_wav_frames = len(dist_matrix) else: num_wav_frames = len(dist_matrix[0]) wav_distances = [0] * num_wav_frames for (i, j) in zip(*path): if i == num_wav_frames - 2 and wav_distances[i] > 0: # Special case for second to last frame continue wav_distances[i] = dist_matrix[i, j] red_segments = [i for i, d in enumerate(wav_distances) if d >= threshold] green_segments = [i for i, d in enumerate(wav_distances) if d < threshold] return red_segments, green_segments, wav_distances def assess_pronunciation_quality(dist_matrix, path, threshold=0.4, wav_type="ref"): # _ is green_segments red_segments, _, wav_distances = get_red_green_segments(dist_matrix, path, wav_type=wav_type, threshold=threshold) # Analyze normalized distances num_red_segments = len(red_segments) total_segments = len(wav_distances) red_percentage = num_red_segments / total_segments if total_segments > 0 else 0.0 # Calculate quality score and repetition need quality_score = 1 - red_percentage needs_repeat = red_percentage > 0.5 # Print debug information print(f"Raw distance stats:") print(f" Min distance: {min(wav_distances):.4f}") print(f" Max distance: {max(wav_distances):.4f}") print(f" Mean distance: {np.mean(wav_distances):.4f}") print(f"\nNormalized distance stats:") print(f" Number of red segments (>= 0.5): {num_red_segments}") print(f" Total segments: {total_segments}") print(f"\nRed percentage: {red_percentage * 100:.2f}%") return quality_score, needs_repeat def denoise_audio(input_audio_path): assert isinstance(input_audio_path, (str, Path)), "Input path must be a string or a Path object" input_audio_path = str(input_audio_path) output_audio_path = input_audio_path.replace(".wav", "_denoised.wav") try: # Load audio and convert to required format audio = AudioSegment.from_wav(input_audio_path) audio = audio.set_frame_rate(48000) # Set to 48 kHz audio = audio.set_channels(1) # Convert to mono audio = audio.set_sample_width(2) # Set to 16-bit # Export as WAV with correct format temp_wav = "temp_audio.wav" audio.export(temp_wav, format="wav") # Run denoising result = subprocess.run( ["denoise", temp_wav, output_audio_path, "--plot"], check=True, capture_output=True, text=True ) print(result.stdout) # Clean up os.remove(temp_wav) except subprocess.CalledProcessError as e: print(f"Error: {e}") print(f"Stdout: {e.stdout}") print(f"Stderr: {e.stderr}") return input_audio_path except Exception as e: print(f"Unexpected error: {e}") return input_audio_path return output_audio_path