Spaces:
Running
Running
| import gradio as gr | |
| import os | |
| import shutil | |
| import zipfile | |
| import sherpa_onnx | |
| import csv | |
| import numpy as np | |
| import gc | |
| import re | |
| from pydub import AudioSegment | |
| from huggingface_hub import hf_hub_download | |
| import urllib.request | |
| # --- CẤU HÌNH --- | |
| MY_REPO_ID = "hoanglinhn0/CUTPRO" | |
| ENCODER_FILENAME = "encoder-epoch-20-avg-10.onnx" | |
| DECODER_FILENAME = "decoder-epoch-20-avg-10.onnx" | |
| JOINER_FILENAME = "joiner-epoch-20-avg-10.onnx" | |
| TOKENS_FILENAME = "config.json" | |
| ASR_SAMPLE_RATE = 16000 | |
| # --- BIẾN TOÀN CỤC --- | |
| recognizer = None | |
| model_status = "" | |
| def load_asr_model(): | |
| global recognizer, model_status | |
| try: | |
| print("⏳ Đang tải ASR model...") | |
| encoder = hf_hub_download(repo_id=MY_REPO_ID, filename=ENCODER_FILENAME, repo_type="space") | |
| decoder = hf_hub_download(repo_id=MY_REPO_ID, filename=DECODER_FILENAME, repo_type="space") | |
| joiner = hf_hub_download(repo_id=MY_REPO_ID, filename=JOINER_FILENAME, repo_type="space") | |
| tokens_raw = hf_hub_download(repo_id=MY_REPO_ID, filename=TOKENS_FILENAME, repo_type="space") | |
| tokens_clean_path = "tokens_fixed.txt" | |
| with open(tokens_raw, 'r', encoding='utf-8') as f_in: | |
| lines = f_in.readlines() | |
| with open(tokens_clean_path, 'w', encoding='utf-8') as f_out: | |
| f_out.writelines(lines) | |
| recognizer = sherpa_onnx.OfflineRecognizer.from_transducer( | |
| encoder=encoder, decoder=decoder, joiner=joiner, | |
| tokens=tokens_clean_path, num_threads=4, | |
| sample_rate=ASR_SAMPLE_RATE, decoding_method="greedy_search" | |
| ) | |
| return "OK" | |
| except Exception as e: | |
| return str(e) | |
| model_status = load_asr_model() | |
| def process_audio_vad(audio_files, min_speech_duration, min_silence_duration): | |
| if model_status != "OK": | |
| return None, f"❌ Lỗi ASR Model: {model_status}" | |
| if not audio_files: | |
| return None, "Vui lòng chọn ít nhất một file audio." | |
| temp_dir = "piper_dataset_final" | |
| if os.path.exists(temp_dir): shutil.rmtree(temp_dir) | |
| os.makedirs(temp_dir, exist_ok=True) | |
| logs = [] | |
| csv_data = [] | |
| file_counter = 0 | |
| try: | |
| logs.append(f"📂 Đã chọn {len(audio_files)} file audio. Bắt đầu xử lý theo thứ tự...") | |
| # ==================== TẢI VAD (chỉ tải 1 lần) ==================== | |
| vad_path = "silero_vad.onnx" | |
| if not os.path.exists(vad_path): | |
| logs.append("⏳ Đang tải silero_vad.onnx...") | |
| urllib.request.urlretrieve( | |
| "https://github.com/k2-fsa/sherpa-onnx/releases/download/asr-models/silero_vad.onnx", | |
| vad_path | |
| ) | |
| logs.append("✅ Tải VAD xong.") | |
| else: | |
| logs.append("✅ VAD model đã có sẵn.") | |
| vad_config = sherpa_onnx.VadModelConfig() | |
| vad_config.silero_vad.model = vad_path | |
| vad_config.silero_vad.min_speech_duration = min_speech_duration | |
| vad_config.silero_vad.min_silence_duration = min_silence_duration | |
| vad_config.sample_rate = ASR_SAMPLE_RATE | |
| vad_engine = sherpa_onnx.VoiceActivityDetector(vad_config, buffer_size_in_seconds=60) | |
| # =============================================================== | |
| # Xử lý từng file theo thứ tự | |
| for idx, audio_file in enumerate(audio_files, 1): | |
| original_name = os.path.splitext(os.path.basename(audio_file))[0] | |
| original_name = re.sub(r'[^a-zA-Z0-9_-]', '_', original_name) | |
| logs.append(f"🔄 Đang xử lý file {idx}/{len(audio_files)}: {original_name}") | |
| sound = AudioSegment.from_file(audio_file).set_frame_rate(ASR_SAMPLE_RATE).set_channels(1) | |
| samples = np.array(sound.get_array_of_samples()).astype(np.float32) / 32768.0 | |
| padding = np.zeros(int(ASR_SAMPLE_RATE * 1.0), dtype=np.float32) | |
| samples = np.concatenate((samples, padding)) | |
| window_size = vad_config.silero_vad.window_size | |
| i = 0 | |
| total_len = len(samples) | |
| while i < total_len: | |
| chunk = samples[i : i + window_size] | |
| vad_engine.accept_waveform(chunk) | |
| i += len(chunk) | |
| speech_segments = [] | |
| while not vad_engine.empty(): | |
| segment_samples = np.array(vad_engine.front.samples, dtype=np.float32) | |
| speech_segments.append(segment_samples) | |
| vad_engine.pop() | |
| # Tạo segment cho file này | |
| for chunk_samples in speech_segments: | |
| s = recognizer.create_stream() | |
| s.accept_waveform(ASR_SAMPLE_RATE, chunk_samples) | |
| recognizer.decode_stream(s) | |
| text = s.result.text.strip() | |
| if text and len(text) > 2: | |
| filename = f"{original_name}_{file_counter:05d}.wav" | |
| filepath = os.path.join(temp_dir, filename) | |
| chunk_audio = AudioSegment( | |
| (chunk_samples * 32767).astype(np.int16).tobytes(), | |
| frame_rate=ASR_SAMPLE_RATE, | |
| sample_width=2, | |
| channels=1 | |
| ).set_frame_rate(22050) | |
| chunk_audio.export(filepath, format="wav") | |
| csv_data.append([filename, text]) | |
| file_counter += 1 | |
| # Xuất CSV + ZIP | |
| csv_path = os.path.join(temp_dir, "metadata.csv") | |
| with open(csv_path, mode='w', encoding='utf-8-sig', newline='') as f: | |
| writer = csv.writer(f, delimiter='|') | |
| writer.writerows(csv_data) | |
| zip_path = "dataset_piper_vad_v2.zip" | |
| if os.path.exists(zip_path): os.remove(zip_path) | |
| with zipfile.ZipFile(zip_path, 'w', zipfile.ZIP_DEFLATED) as zipf: | |
| for root, _, files in os.walk(temp_dir): | |
| for file in files: | |
| zipf.write(os.path.join(root, file), arcname=file) | |
| logs.append(f"🎉 HOÀN TẤT! Đã xử lý {len(audio_files)} file → Tạo {file_counter} câu") | |
| return zip_path, "\n".join(logs) | |
| except Exception as e: | |
| return None, f"❌ Lỗi: {str(e)}" | |
| finally: | |
| gc.collect() | |
| # --- UI --- | |
| with gr.Blocks(theme=gr.themes.Soft(primary_hue="green")) as demo: | |
| gr.Markdown("# 🎙️ Piper Dataset Maker - VAD V2 (Hỗ trợ nhiều file)") | |
| gr.Markdown("Chọn nhiều file audio cùng lúc (giữ Ctrl để chọn nhiều). Metadata sẽ theo đúng thứ tự file bạn chọn.") | |
| with gr.Row(): | |
| with gr.Column(): | |
| audio_input = gr.File( | |
| label="📁 Chọn nhiều file audio (Ctrl + click để chọn nhiều)", | |
| file_count="multiple", | |
| type="filepath" | |
| ) | |
| with gr.Row(): | |
| min_speech = gr.Slider(0.3, 1.5, value=0.7, label="Độ dài câu tối thiểu (s)") | |
| min_silence = gr.Slider(0.5, 3.0, value=1.2, label="Khoảng lặng tối thiểu để cắt (s)") | |
| btn_run = gr.Button("🚀 BẮT ĐẦU TRÍCH XUẤT TẤT CẢ", variant="primary") | |
| with gr.Column(): | |
| logs = gr.Textbox(label="Nhật ký hệ thống", lines=15) | |
| file_output = gr.File(label="📥 Tải bộ Dataset ZIP") | |
| btn_run.click(process_audio_vad, inputs=[audio_input, min_speech, min_silence], outputs=[file_output, logs]) | |
| if __name__ == "__main__": | |
| demo.launch() |