Spaces:
Build error
Build error
| #!/usr/bin/env python3 | |
| # app.py - Modified for Piper Dataset Creation (Smart Split) | |
| import logging | |
| import os | |
| import tempfile | |
| import shutil | |
| import zipfile | |
| from datetime import datetime | |
| from pathlib import Path | |
| import gradio as gr | |
| import torch | |
| import torchaudio | |
| from pydub import AudioSegment, silence, effects # Thêm effects để normalize | |
| # Import từ file model của bạn (giữ nguyên cấu trúc cũ) | |
| from examples import examples | |
| from model import ( | |
| decode, | |
| get_pretrained_model, | |
| get_punct_model, | |
| language_to_models, | |
| ) | |
| # Cấu hình log | |
| logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s") | |
| def MyPrint(s): | |
| now = datetime.now() | |
| date_time = now.strftime("%Y-%m-%d %H:%M:%S") | |
| print(f"{date_time}: {s}") | |
| # --- CÁC HÀM XỬ LÝ AUDIO THÔNG MINH --- | |
| def smart_split_audio( | |
| in_filename: str, | |
| output_dir: str, | |
| base_name: str, | |
| min_silence_len=500, # Độ dài khoảng lặng để xác định là "ngắt câu" | |
| silence_thresh=-40, # Ngưỡng dB | |
| max_len_sec=12, # Độ dài tối đa cho phép của 1 đoạn (Piper tốt nhất < 15s) | |
| keep_silence=300 # Giữ lại bao nhiêu ms khoảng lặng ở đầu/cuối (Padding) | |
| ): | |
| """ | |
| Cắt audio thông minh: | |
| 1. Normalize audio để ngưỡng dB hoạt động đúng. | |
| 2. Dùng detect_nonsilent để tìm các đoạn CÓ TIẾNG. | |
| 3. Gộp các đoạn tiếng gần nhau (cách nhau < min_silence_len) thành 1 câu. | |
| 4. Thêm padding (keep_silence) để không bị cụt chữ. | |
| """ | |
| try: | |
| # 1. Load và Normalize Audio | |
| sound = AudioSegment.from_file(in_filename) | |
| sound = sound.set_frame_rate(16000).set_channels(1) | |
| # Normalize giúp âm lượng đồng đều, tránh việc đoạn nhỏ bị cắt nhầm | |
| sound = effects.normalize(sound) | |
| # Vì đã normalize, ngưỡng silence_thresh nên chỉnh lại tương đối | |
| # Tuy nhiên để người dùng dễ chỉnh, ta dùng tham số truyền vào nhưng tính toán lại một chút | |
| # Nếu audio đã normalize, max volume là 0dBFS. Ngưỡng cắt nên khoảng -40 đến -50. | |
| MyPrint(f"Đang phân tích audio: {in_filename} | Độ dài: {len(sound)/1000}s") | |
| # 2. Phát hiện các đoạn có tiếng (Non-silent chunks) | |
| # seek_step=10: bước nhảy 10ms giúp xử lý nhanh hơn | |
| nonsilent_ranges = silence.detect_nonsilent( | |
| sound, | |
| min_silence_len=min_silence_len, | |
| silence_thresh=silence_thresh, | |
| seek_step=10 | |
| ) | |
| if not nonsilent_ranges: | |
| MyPrint(f"⚠️ Không tìm thấy giọng nói trong file {in_filename}. Kiểm tra lại ngưỡng dB.") | |
| return [] | |
| # 3. Thuật toán Gộp đoạn (Merging Logic) | |
| # Mục tiêu: Không để các từ bị rời rạc. Nếu khoảng cách giữa 2 từ < min_silence_len, gộp chúng lại. | |
| # Ở đây detect_nonsilent đã làm việc đó dựa trên min_silence_len. | |
| # Tuy nhiên, ta cần kiểm tra độ dài tổng để đảm bảo không quá ngắn hoặc quá dài. | |
| output_files = [] | |
| chunk_count = 0 | |
| # Xử lý từng khoảng thời gian tìm được | |
| for start_i, end_i in nonsilent_ranges: | |
| # Tính toán padding để âm thanh nghe "tròn" hơn | |
| # Lùi điểm đầu lại một chút (start - keep_silence) | |
| # Kéo điểm cuối ra một chút (end + keep_silence) | |
| adj_start = max(0, start_i - keep_silence) | |
| adj_end = min(len(sound), end_i + keep_silence) | |
| chunk_duration = (adj_end - adj_start) / 1000.0 | |
| # Lọc rác: Bỏ qua đoạn quá ngắn (< 0.3s) - thường là tiếng click chuột hoặc noise | |
| if chunk_duration < 0.3: | |
| continue | |
| # Xử lý đoạn quá dài: Nếu dài hơn max_len_sec, Piper sẽ khó học. | |
| # Với script đơn giản này, ta chấp nhận lưu, nhưng cảnh báo. | |
| # (Giải pháp nâng cao là dùng VAD AI để cắt nhỏ hơn, nhưng ở đây dùng pydub cho nhẹ) | |
| if chunk_duration > max_len_sec: | |
| MyPrint(f"⚠️ Chunk {chunk_count} hơi dài: {chunk_duration:.1f}s (Khuyên dùng < {max_len_sec}s)") | |
| chunk = sound[adj_start:adj_end] | |
| # Fade in/out cực nhẹ (10ms) để tránh tiếng "bụp" ở đầu/cuối file | |
| chunk = chunk.fade_in(10).fade_out(10) | |
| # Xuất file | |
| out_name = f"{base_name}_{chunk_count:04d}.wav" | |
| out_path = os.path.join(output_dir, out_name) | |
| chunk.export(out_path, format="wav", parameters=["-ac", "1", "-ar", "16000"]) | |
| output_files.append(out_path) | |
| chunk_count += 1 | |
| return output_files | |
| except Exception as e: | |
| MyPrint(f"Lỗi khi cắt file {in_filename}: {e}") | |
| return [] | |
| # --- HÀM XỬ LÝ BATCH --- | |
| def process_batch_files( | |
| language: str, | |
| repo_id: str, | |
| decoding_method: str, | |
| num_active_paths: int, | |
| uploaded_files: list, | |
| silence_thresh: int, | |
| min_silence_len: int, | |
| progress=gr.Progress() | |
| ): | |
| if not uploaded_files: | |
| return None, "Vui lòng chọn ít nhất một file audio." | |
| MyPrint(f"Bắt đầu xử lý: {len(uploaded_files)} files gốc.") | |
| # Lưu ý: threshold càng âm thì càng nhạy (ví dụ -50 nhạy hơn -30). | |
| # Nhưng vì ta đã normalize audio, nên threshold -40 đến -50 là chuẩn. | |
| MyPrint(f"Cấu hình cắt: Thresh={silence_thresh}dB | Min Gap={min_silence_len}ms") | |
| tmp_dir = tempfile.mkdtemp() | |
| wavs_dir = os.path.join(tmp_dir, "wavs") | |
| os.makedirs(wavs_dir, exist_ok=True) | |
| csv_path = os.path.join(tmp_dir, "metadata.csv") | |
| # Load Model Sherpa | |
| try: | |
| recognizer = get_pretrained_model( | |
| repo_id, | |
| decoding_method=decoding_method, | |
| num_active_paths=num_active_paths, | |
| ) | |
| except Exception as e: | |
| return None, f"Lỗi tải model: {str(e)}" | |
| results_metadata = [] | |
| total_chunks = 0 | |
| for file_obj in progress.tqdm(uploaded_files, desc="Đang cắt & Nhận dạng..."): | |
| in_path = file_obj.name | |
| base_name = Path(in_path).stem | |
| # --- SỬ DỤNG HÀM CẮT MỚI --- | |
| chunk_paths = smart_split_audio( | |
| in_path, | |
| wavs_dir, | |
| base_name, | |
| min_silence_len=min_silence_len, # Khoảng lặng tối thiểu để tính là hết câu | |
| silence_thresh=silence_thresh, # Ngưỡng ồn | |
| keep_silence=300 # Thêm 300ms đệm đầu/cuối để TRÒN CHỮ | |
| ) | |
| for chunk_path in chunk_paths: | |
| try: | |
| text = decode(recognizer, chunk_path) | |
| text = text.strip() | |
| # Logic làm sạch text | |
| # Bỏ qua các đoạn text quá ngắn (thường là halluncination/ảo giác của AI) | |
| if len(text) > 1: | |
| wav_filename = os.path.basename(chunk_path) | |
| # Piper format: filename|transcript | |
| line = f"{wav_filename}|{text}" | |
| results_metadata.append(line) | |
| total_chunks += 1 | |
| else: | |
| # Xóa file rác (âm thanh tiếng thở, click chuột mà AI không dịch ra chữ) | |
| os.remove(chunk_path) | |
| except Exception as e: | |
| MyPrint(f"Lỗi decode {chunk_path}: {e}") | |
| # Ghi file metadata.csv | |
| with open(csv_path, "w", encoding="utf-8") as f: | |
| for line in results_metadata: | |
| f.write(line + "\n") | |
| MyPrint(f"Hoàn tất! Tổng số mẫu: {total_chunks}") | |
| # Nén zip | |
| zip_filename = f"piper_dataset_{datetime.now().strftime('%Y%m%d_%H%M%S')}.zip" | |
| zip_path = os.path.join(tempfile.gettempdir(), zip_filename) | |
| shutil.make_archive(zip_path.replace('.zip', ''), 'zip', tmp_dir) | |
| info_text = ( | |
| f"✅ Xử lý hoàn tất!\n" | |
| f"- Tổng số câu (segments): {total_chunks}\n" | |
| f"- File đã được Normalize và thêm Padding (đệm) để tròn chữ.\n" | |
| f"- Tải file .zip bên dưới để train Piper." | |
| ) | |
| return zip_path, info_text | |
| def update_model_dropdown(language: str): | |
| if language in language_to_models: | |
| choices = language_to_models[language] | |
| return gr.Dropdown(choices=choices, value=choices[0], interactive=True) | |
| raise ValueError(f"Unsupported language: {language}") | |
| # --- GIAO DIỆN GRADIO --- | |
| css = """ | |
| .result {display:flex;flex-direction:column} | |
| """ | |
| with gr.Blocks(css=css, title="Auto Piper Dataset Maker (Smart Split)") as demo: | |
| gr.Markdown("# ✂️ Auto Piper Dataset Maker (Smart Split)") | |
| gr.Markdown("Tự động cắt audio, chuẩn hóa âm lượng và thêm vùng đệm để **không bị mất chữ**.") | |
| with gr.Row(): | |
| with gr.Column(scale=1): | |
| gr.Markdown("### 1. Cấu hình Model ASR") | |
| language_choices = list(language_to_models.keys()) | |
| language_radio = gr.Radio( | |
| label="Ngôn ngữ nhận dạng", | |
| choices=language_choices, | |
| value=language_choices[0], | |
| ) | |
| model_dropdown = gr.Dropdown( | |
| choices=language_to_models[language_choices[0]], | |
| label="Chọn Model Sherpa-ONNX", | |
| value=language_to_models[language_choices[0]][0], | |
| ) | |
| language_radio.change(update_model_dropdown, inputs=language_radio, outputs=model_dropdown) | |
| gr.Markdown("---") | |
| gr.Markdown("### 2. Cấu hình Cắt Audio") | |
| gr.Markdown("*(Đã bật chế độ Normalize và Smart Padding)*") | |
| silence_thresh_slider = gr.Slider( | |
| minimum=-60, maximum=-20, value=-45, step=1, | |
| label="Ngưỡng khoảng lặng (dB)", | |
| info="Mặc định -45dB. Nếu file bị cắt vụn quá nhiều câu ngắn, hãy giảm xuống -50 hoặc -55." | |
| ) | |
| min_silence_slider = gr.Slider( | |
| minimum=200, maximum=2000, value=700, step=100, | |
| label="Độ dài ngắt câu (ms)", | |
| info="Khoảng lặng phải dài hơn số này mới được tính là hết câu. Tăng lên (800-1000) để câu dài hơn." | |
| ) | |
| with gr.Column(scale=2): | |
| gr.Markdown("### 3. Upload & Xử lý") | |
| files_input = gr.File( | |
| label="Upload Audio gốc", | |
| file_count="multiple", | |
| type="filepath" | |
| ) | |
| batch_btn = gr.Button("🚀 Cắt Audio & Tạo Dataset", variant="primary") | |
| status_output = gr.Textbox(label="Log Kết quả", lines=5) | |
| file_output = gr.File(label="Download Dataset (.zip)") | |
| decoding_method_state = gr.State("modified_beam_search") | |
| num_active_paths_state = gr.State(4) | |
| batch_btn.click( | |
| process_batch_files, | |
| inputs=[ | |
| language_radio, | |
| model_dropdown, | |
| decoding_method_state, | |
| num_active_paths_state, | |
| files_input, | |
| silence_thresh_slider, | |
| min_silence_slider | |
| ], | |
| outputs=[file_output, status_output], | |
| ) | |
| torch.set_num_threads(1) | |
| torch.set_num_interop_threads(1) | |
| if __name__ == "__main__": | |
| demo.launch(share=True) |