Spaces:
Running
Running
| import gradio as gr | |
| import os | |
| import zipfile | |
| import tempfile | |
| from pathlib import Path | |
| from faster_whisper import WhisperModel | |
| import librosa | |
| import soundfile as sf | |
| import pandas as pd | |
| import numpy as np | |
| from typing import List, Tuple | |
| import shutil | |
| model = WhisperModel("large-v3-turbo", device="cpu", compute_type="int8") | |
| def extract_audio_files(input_file: str, temp_dir: str) -> List[str]: | |
| audio_files = [] | |
| audio_extensions = {'.wav', '.mp3', '.flac', '.ogg', '.m4a', '.aac'} | |
| if input_file.endswith('.zip'): | |
| with zipfile.ZipFile(input_file, 'r') as zip_ref: | |
| zip_ref.extractall(temp_dir) | |
| for root, _, files in os.walk(temp_dir): | |
| for file in files: | |
| if Path(file).suffix.lower() in audio_extensions: | |
| audio_files.append(os.path.join(root, file)) | |
| else: | |
| if Path(input_file).suffix.lower() in audio_extensions: | |
| audio_files.append(input_file) | |
| return audio_files | |
| def transcribe_with_timestamps(audio_path: str) -> List[dict]: | |
| segments, info = model.transcribe( | |
| audio_path, | |
| beam_size=5, | |
| vad_filter=True, | |
| vad_parameters=dict(min_silence_duration_ms=500) | |
| ) | |
| results = [] | |
| for segment in segments: | |
| results.append({ | |
| 'start': segment.start, | |
| 'end': segment.end, | |
| 'text': segment.text.strip() | |
| }) | |
| return results | |
| def merge_short_segments(segments: List[dict], min_duration: float = 2.0) -> List[dict]: | |
| if not segments: | |
| return [] | |
| merged = [] | |
| current = segments[0].copy() | |
| for seg in segments[1:]: | |
| current_duration = current['end'] - current['start'] | |
| if current_duration < min_duration: | |
| current['end'] = seg['end'] | |
| current['text'] = current['text'] + ' ' + seg['text'] | |
| else: | |
| merged.append(current) | |
| current = seg.copy() | |
| merged.append(current) | |
| return merged | |
| def cut_audio_by_timestamps(audio_path: str, segments: List[dict], output_dir: str, base_name: str) -> List[dict]: | |
| audio, sr = librosa.load(audio_path, sr=None) | |
| audio_records = [] | |
| for idx, seg in enumerate(segments): | |
| start_sample = int(seg['start'] * sr) | |
| end_sample = int(seg['end'] * sr) | |
| audio_segment = audio[start_sample:end_sample] | |
| output_filename = f"{base_name}_{idx+1:05d}.wav" | |
| output_path = os.path.join(output_dir, output_filename) | |
| sf.write(output_path, audio_segment, sr) | |
| audio_records.append({ | |
| 'audio_path': output_path, | |
| 'transcription': seg['text'], | |
| 'file_name': f"audio/{output_filename}" | |
| }) | |
| return audio_records | |
| def save_to_parquet(records: List[dict], output_dir: str, max_size_mb: int = 500): | |
| df = pd.DataFrame(records) | |
| audio_data = [] | |
| for path in df['audio_path']: | |
| with open(path, 'rb') as f: | |
| audio_data.append(f.read()) | |
| df['audio'] = audio_data | |
| df = df[['audio', 'transcription', 'file_name']] | |
| temp_path = os.path.join(output_dir, 'temp.parquet') | |
| df.to_parquet(temp_path, engine='pyarrow') | |
| file_size_mb = os.path.getsize(temp_path) / (1024 * 1024) | |
| os.remove(temp_path) | |
| parquet_files = [] | |
| if file_size_mb <= max_size_mb: | |
| output_path = os.path.join(output_dir, 'train-00000-of-00001.parquet') | |
| df.to_parquet(output_path, engine='pyarrow') | |
| parquet_files.append(output_path) | |
| else: | |
| num_parts = int(np.ceil(file_size_mb / max_size_mb)) | |
| chunk_size = len(df) // num_parts + 1 | |
| for i in range(num_parts): | |
| start_idx = i * chunk_size | |
| end_idx = min((i + 1) * chunk_size, len(df)) | |
| df_chunk = df.iloc[start_idx:end_idx] | |
| output_path = os.path.join(output_dir, f'train-{i:05d}-of-{num_parts:05d}.parquet') | |
| df_chunk.to_parquet(output_path, engine='pyarrow') | |
| parquet_files.append(output_path) | |
| return parquet_files | |
| def get_sample_files(): | |
| sample_dir = "Sample" | |
| if not os.path.exists(sample_dir): | |
| return [] | |
| audio_extensions = {'.wav', '.mp3', '.flac', '.ogg', '.m4a', '.aac', '.zip'} | |
| sample_files = [] | |
| for file in os.listdir(sample_dir): | |
| if Path(file).suffix.lower() in audio_extensions: | |
| sample_files.append(os.path.join(sample_dir, file)) | |
| return sample_files | |
| def process_audio(input_file): | |
| if input_file is None: | |
| return None, "Vui lòng upload file audio hoặc file zip!" | |
| with tempfile.TemporaryDirectory() as temp_dir: | |
| extract_dir = os.path.join(temp_dir, 'extracted') | |
| audio_output_dir = os.path.join(temp_dir, 'audio') | |
| final_output_dir = os.path.join(temp_dir, 'output') | |
| os.makedirs(extract_dir, exist_ok=True) | |
| os.makedirs(audio_output_dir, exist_ok=True) | |
| os.makedirs(final_output_dir, exist_ok=True) | |
| audio_files = extract_audio_files(input_file, extract_dir) | |
| if not audio_files: | |
| return None, "Không tìm thấy file audio nào!" | |
| all_records = [] | |
| for audio_file in audio_files: | |
| base_name = Path(audio_file).stem | |
| segments = transcribe_with_timestamps(audio_file) | |
| merged_segments = merge_short_segments(segments, min_duration=2.0) | |
| records = cut_audio_by_timestamps( | |
| audio_file, | |
| merged_segments, | |
| audio_output_dir, | |
| base_name | |
| ) | |
| all_records.extend(records) | |
| parquet_files = save_to_parquet(all_records, final_output_dir) | |
| final_audio_dir = os.path.join(final_output_dir, 'audio') | |
| shutil.copytree(audio_output_dir, final_audio_dir) | |
| zip_path = os.path.join(temp_dir, 'dataset_output.zip') | |
| with zipfile.ZipFile(zip_path, 'w', zipfile.ZIP_DEFLATED) as zipf: | |
| for root, _, files in os.walk(final_audio_dir): | |
| for file in files: | |
| file_path = os.path.join(root, file) | |
| arcname = os.path.join('audio', file) | |
| zipf.write(file_path, arcname) | |
| for pq_file in parquet_files: | |
| zipf.write(pq_file, os.path.basename(pq_file)) | |
| final_zip = os.path.join(tempfile.gettempdir(), 'dataset_output.zip') | |
| shutil.copy(zip_path, final_zip) | |
| summary = f""" | |
| ✅ Xử lý thành công! | |
| - Số file audio đầu vào: {len(audio_files)} | |
| - Số segment đã tạo: {len(all_records)} | |
| - Số file parquet: {len(parquet_files)} | |
| - File zip đầu ra: dataset_output.zip | |
| """ | |
| return final_zip, summary | |
| with gr.Blocks(title="Audio Transcription & Dataset Creator") as app: | |
| gr.Markdown(""" | |
| # 🎙️ Audio Transcription & Dataset Creator | |
| Upload file audio hoặc file zip chứa nhiều file audio. | |
| Hệ thống sẽ: | |
| 1. Transcribe bằng Whisper Large-v3-Turbo | |
| 2. Cắt audio theo timestamps (gộp câu ngắn) | |
| 3. Tạo dataset Parquet chuẩn với audio bytes | |
| """) | |
| with gr.Row(): | |
| with gr.Column(): | |
| input_file = gr.File( | |
| label="Upload Audio File hoặc ZIP", | |
| file_types=['.wav', '.mp3', '.flac', '.ogg', '.m4a', '.aac', '.zip'] | |
| ) | |
| sample_files = get_sample_files() | |
| if sample_files: | |
| gr.Markdown("### 📂 Hoặc chọn file mẫu:") | |
| sample_dropdown = gr.Dropdown( | |
| choices=sample_files, | |
| label="Chọn file mẫu", | |
| interactive=True | |
| ) | |
| load_sample_btn = gr.Button("📥 Load file mẫu", variant="secondary") | |
| process_btn = gr.Button("🚀 Bắt đầu xử lý", variant="primary") | |
| with gr.Column(): | |
| output_file = gr.File(label="📦 Tải về Dataset ZIP") | |
| status_text = gr.Textbox(label="📊 Trạng thái", lines=8) | |
| if sample_files: | |
| def load_sample(sample_path): | |
| return sample_path | |
| load_sample_btn.click( | |
| fn=load_sample, | |
| inputs=sample_dropdown, | |
| outputs=input_file | |
| ) | |
| process_btn.click( | |
| fn=process_audio, | |
| inputs=input_file, | |
| outputs=[output_file, status_text] | |
| ) | |
| gr.Markdown(""" | |
| ### 📝 Ghi chú: | |
| - Dataset Parquet sẽ được chia nhỏ nếu > 500MB | |
| - Cột `audio`: audio bytes (binary) | |
| - Cột `transcription`: văn bản transcription | |
| - Cột `file_name`: đường dẫn dạng `audio/filename_00001.wav` | |
| - Các câu ngắn (< 2s) sẽ được gộp lại | |
| - Format tên: train-00000-of-00007.parquet | |
| """) | |
| if __name__ == "__main__": | |
| app.launch() |