Spaces:
Runtime error
Runtime error
| from typing import List, Tuple | |
| import numpy as np | |
| from pydub import AudioSegment | |
| import os | |
| from chunkformer import ChunkFormerModel | |
| from clearvoice import ClearVoice | |
| # ======================= ASR + CLEARVOICE + AUDIO PROCESSING ======================= | |
| ASR_MODEL = None | |
| CLEARVOICE_MODEL = None | |
| REF_AUDIO_CACHE = {} # cache: đường dẫn input -> đường dẫn output đã xử lý | |
| def get_asr_model() -> ChunkFormerModel: | |
| """Lazy-load ChunkFormer (ASR, chạy trên CPU).""" | |
| global ASR_MODEL | |
| if ASR_MODEL is None: | |
| ASR_MODEL = ChunkFormerModel.from_pretrained("khanhld/chunkformer-ctc-large-vie") | |
| return ASR_MODEL | |
| def get_clearvoice_model() -> ClearVoice: | |
| """Lazy-load ClearVoice để khử nhiễu ref audio.""" | |
| global CLEARVOICE_MODEL | |
| if CLEARVOICE_MODEL is None: | |
| CLEARVOICE_MODEL = ClearVoice( | |
| task="speech_enhancement", | |
| model_names=["MossFormer2_SE_48K"], | |
| ) | |
| return CLEARVOICE_MODEL | |
| def find_silent_regions( | |
| audio: AudioSegment, | |
| silence_thresh: float = 0.05, # biên độ sau chuẩn hoá [-1, 1] | |
| chunk_ms: int = 10, | |
| min_silence_len: int = 200, | |
| ) -> List[Tuple[int, int]]: | |
| """ | |
| Tìm các khoảng lặng (start_ms, end_ms) trong AudioSegment dựa trên biên độ. | |
| """ | |
| samples = np.array(audio.get_array_of_samples(), dtype=np.float32) | |
| if audio.channels > 1: | |
| samples = samples.reshape((-1, audio.channels)).mean(axis=1) | |
| norm = samples / (2 ** (audio.sample_width * 8 - 1)) | |
| sr = audio.frame_rate | |
| chunk_size = max(1, int(sr * chunk_ms / 1000)) | |
| total_chunks = len(norm) // chunk_size | |
| silent_regions: List[Tuple[int, int]] = [] | |
| start = None | |
| for i in range(total_chunks): | |
| chunk = norm[i * chunk_size: (i + 1) * chunk_size] | |
| if chunk.size == 0: | |
| continue | |
| if np.all((chunk > -silence_thresh) & (chunk < silence_thresh)): | |
| if start is None: | |
| start = i | |
| else: | |
| if start is not None: | |
| dur = (i - start) * chunk_ms | |
| if dur >= min_silence_len: | |
| silent_regions.append((start * chunk_ms, i * chunk_ms)) | |
| start = None | |
| if start is not None: | |
| dur = (total_chunks - start) * chunk_ms | |
| if dur >= min_silence_len: | |
| silent_regions.append((start * chunk_ms, total_chunks * chunk_ms)) | |
| return silent_regions | |
| def trim_leading_trailing_silence( | |
| audio: AudioSegment, | |
| silence_thresh: float = 0.05, | |
| chunk_ms: int = 10, | |
| min_silence_len: int = 200, | |
| ) -> AudioSegment: | |
| """ | |
| Bỏ khoảng lặng đầu/cuối file. | |
| """ | |
| duration = len(audio) | |
| silent_regions = find_silent_regions( | |
| audio, | |
| silence_thresh=silence_thresh, | |
| chunk_ms=chunk_ms, | |
| min_silence_len=min_silence_len, | |
| ) | |
| if not silent_regions: | |
| return audio | |
| start_trim = 0 | |
| end_trim = duration | |
| # khoảng lặng đầu file | |
| first_start, first_end = silent_regions[0] | |
| if first_start <= 0: | |
| start_trim = max(start_trim, first_end) | |
| # khoảng lặng cuối file | |
| last_start, last_end = silent_regions[-1] | |
| if last_end >= duration: | |
| end_trim = min(end_trim, last_start) | |
| return audio[start_trim:end_trim] | |
| def compress_internal_silence( | |
| audio: AudioSegment, | |
| max_silence_ms: int = 300, | |
| silence_thresh: float = 0.05, | |
| chunk_ms: int = 10, | |
| min_silence_len: int = 50, | |
| ) -> AudioSegment: | |
| """ | |
| Rút ngắn khoảng lặng giữa file: | |
| - Khoảng lặng <= max_silence_ms: giữ nguyên | |
| - Khoảng lặng > max_silence_ms: cắt còn max_silence_ms | |
| """ | |
| duration = len(audio) | |
| silent_regions = find_silent_regions( | |
| audio, | |
| silence_thresh=silence_thresh, | |
| chunk_ms=chunk_ms, | |
| min_silence_len=min_silence_len, | |
| ) | |
| if not silent_regions: | |
| return audio | |
| new_audio = AudioSegment.silent(duration=0, frame_rate=audio.frame_rate) | |
| cursor = 0 | |
| for s_start, s_end in silent_regions: | |
| # phần có tiếng nói trước khoảng lặng | |
| if s_start > cursor: | |
| new_audio += audio[cursor:s_start] | |
| silence_len = s_end - s_start | |
| if silence_len <= max_silence_ms: | |
| new_audio += audio[s_start:s_end] | |
| else: | |
| new_audio += audio[s_start: s_start + max_silence_ms] | |
| cursor = s_end | |
| # phần còn lại sau khoảng lặng cuối | |
| if cursor < duration: | |
| new_audio += audio[cursor:] | |
| return new_audio | |
| def select_subsegment_by_silence( | |
| audio: AudioSegment, | |
| min_len_ms: int = 5000, | |
| max_len_ms: int = 10000, | |
| silence_thresh: float = 0.05, | |
| chunk_ms: int = 10, | |
| min_silence_len: int = 200, | |
| ) -> AudioSegment: | |
| """ | |
| Nếu audio > max_len_ms, chọn 1 đoạn dài trong khoảng [min_len_ms, max_len_ms], | |
| cắt tại điểm nằm trong khoảng lặng để tránh cắt dính giọng nói. | |
| """ | |
| duration = len(audio) | |
| if duration <= max_len_ms: | |
| return audio | |
| silent_regions = find_silent_regions( | |
| audio, | |
| silence_thresh=silence_thresh, | |
| chunk_ms=chunk_ms, | |
| min_silence_len=min_silence_len, | |
| ) | |
| if not silent_regions: | |
| # không tìm được khoảng lặng -> lấy đoạn giữa | |
| target_len = min(max_len_ms, duration) | |
| start = max(0, (duration - target_len) // 2) | |
| end = start + target_len | |
| return audio[start:end] | |
| # boundary là midpoint của khoảng lặng (chắc chắn nằm trong vùng im lặng) | |
| boundaries = [0] | |
| for s_start, s_end in silent_regions: | |
| mid = (s_start + s_end) // 2 | |
| if 0 < mid < duration: | |
| boundaries.append(mid) | |
| boundaries.append(duration) | |
| boundaries = sorted(set(boundaries)) | |
| # ưu tiên đoạn đầu tiên thỏa 5–10s | |
| for i in range(len(boundaries)): | |
| for j in range(i + 1, len(boundaries)): | |
| seg_len = boundaries[j] - boundaries[i] | |
| if min_len_ms <= seg_len <= max_len_ms: | |
| return audio[boundaries[i]:boundaries[j]] | |
| # nếu không có đoạn nào nằm trọn trong [min, max], chọn đoạn gần max_len nhất | |
| best_i, best_j, best_diff = 0, None, None | |
| for i in range(len(boundaries)): | |
| for j in range(i + 1, len(boundaries)): | |
| seg_len = boundaries[j] - boundaries[i] | |
| if seg_len >= min_len_ms: | |
| diff = abs(seg_len - max_len_ms) | |
| if best_diff is None or diff < best_diff: | |
| best_diff = diff | |
| best_i, best_j = i, j | |
| if best_j is not None: | |
| return audio[boundaries[best_i]:boundaries[best_j]] | |
| # fallback cuối cùng | |
| target_len = min(max_len_ms, duration) | |
| start = max(0, (duration - target_len) // 2) | |
| end = start + target_len | |
| return audio[start:end] | |
| def enhance_ref_audio(input_path: str) -> str: | |
| """ | |
| Pipeline xử lý WAV cho TTS: | |
| - ClearVoice khử nhiễu | |
| - Bỏ khoảng lặng đầu/cuối | |
| - Rút ngắn khoảng lặng giữa > 0.3s thành 0.3s | |
| - Nếu audio > 10s: chọn 1 đoạn 5–10s, cắt tại khoảng lặng | |
| Trả về đường dẫn file wav đã xử lý. | |
| """ | |
| if not input_path: | |
| raise ValueError("No input audio path for enhancement.") | |
| # cache để cùng 1 file không phải xử lý nhiều lần | |
| if input_path in REF_AUDIO_CACHE: | |
| return REF_AUDIO_CACHE[input_path] | |
| cv = get_clearvoice_model() | |
| # 1) khử nhiễu | |
| try: | |
| cv_out = cv(input_path=input_path, online_write=False) | |
| base = os.path.basename(input_path) | |
| name, ext = os.path.splitext(base) | |
| if not ext: | |
| ext = ".wav" | |
| denoised_path = os.path.join(os.path.dirname(input_path), f"{name}_denoised{ext}") | |
| cv.write(cv_out, output_path=denoised_path) | |
| except Exception as e: | |
| print(f"[ClearVoice] Error during denoising, fallback to original: {e}") | |
| denoised_path = input_path | |
| # 2) pydub xử lý khoảng lặng + length | |
| audio = AudioSegment.from_file(denoised_path) | |
| # bỏ khoảng lặng đầu/cuối | |
| audio = trim_leading_trailing_silence(audio) | |
| # rút ngắn khoảng lặng giữa | |
| audio = compress_internal_silence(audio, max_silence_ms=300) | |
| # nếu >10s thì chọn đoạn trong khoảng 5–10s | |
| audio = select_subsegment_by_silence(audio, min_len_ms=5000, max_len_ms=10000) | |
| # 3) ghi ra file mới | |
| enhanced_path = os.path.join(os.path.dirname(denoised_path), f"{name}_enhanced.wav") | |
| audio.export(enhanced_path, format="wav") | |
| REF_AUDIO_CACHE[input_path] = enhanced_path | |
| return enhanced_path | |
| def split_audio_by_silence( | |
| audio: AudioSegment, | |
| silence_thresh: float = 0.05, | |
| chunk_ms: int = 10, | |
| min_silence_len: int = 200, | |
| min_segment_len: int = 200, | |
| ) -> List[Tuple[int, int]]: | |
| """ | |
| Từ AudioSegment, trả về các đoạn có tiếng nói (non-silent) | |
| được tách bằng khoảng lặng. | |
| """ | |
| duration = len(audio) | |
| silent_regions = find_silent_regions( | |
| audio, | |
| silence_thresh=silence_thresh, | |
| chunk_ms=chunk_ms, | |
| min_silence_len=min_silence_len, | |
| ) | |
| segments: List[Tuple[int, int]] = [] | |
| cur_start = 0 | |
| for s_start, s_end in silent_regions: | |
| if cur_start < s_start: | |
| if s_start - cur_start >= min_segment_len: | |
| segments.append((cur_start, s_start)) | |
| cur_start = s_end | |
| if cur_start < duration and duration - cur_start >= min_segment_len: | |
| segments.append((cur_start, duration)) | |
| # nếu không tìm được đoạn nào, lấy cả file | |
| if not segments: | |
| segments.append((0, duration)) | |
| return segments | |
| def transcribe_ref_audio(audio_path: str) -> str: | |
| """ | |
| ASR theo yêu cầu: | |
| - Cắt âm thanh theo khoảng lặng | |
| - ASR từng đoạn | |
| - Nối text bằng dấu phẩy | |
| """ | |
| if not audio_path: | |
| raise ValueError("No audio path for ASR.") | |
| model = get_asr_model() | |
| audio = AudioSegment.from_file(audio_path) | |
| segments = split_audio_by_silence(audio) | |
| texts = [] | |
| base, _ = os.path.splitext(audio_path) | |
| for idx, (start_ms, end_ms) in enumerate(segments): | |
| seg_audio = audio[start_ms:end_ms] | |
| seg_path = f"{base}_seg_{idx}.wav" | |
| seg_audio.export(seg_path, format="wav") | |
| try: | |
| transcription = model.endless_decode( | |
| audio_path=seg_path, | |
| chunk_size=32, | |
| left_context_size=0, | |
| right_context_size=0, | |
| total_batch_duration=400, | |
| return_timestamps=False, | |
| ) | |
| except TypeError: | |
| transcription = model.endless_decode( | |
| audio_path=seg_path, | |
| chunk_size=32, | |
| left_context_size=0, | |
| right_context_size=0, | |
| total_batch_duration=400, | |
| ) | |
| if isinstance(transcription, str): | |
| text = transcription | |
| else: | |
| text = str(transcription) | |
| text = text.strip() | |
| if text: | |
| texts.append(text) | |
| return ", ".join(texts) | |