import gradio as gr import os import zipfile import tempfile from pathlib import Path from faster_whisper import WhisperModel import librosa import soundfile as sf import pandas as pd import numpy as np from typing import List, Tuple import shutil model = WhisperModel("large-v3-turbo", device="cpu", compute_type="int8") def extract_audio_files(input_file: str, temp_dir: str) -> List[str]: audio_files = [] audio_extensions = {'.wav', '.mp3', '.flac', '.ogg', '.m4a', '.aac'} if input_file.endswith('.zip'): with zipfile.ZipFile(input_file, 'r') as zip_ref: zip_ref.extractall(temp_dir) for root, _, files in os.walk(temp_dir): for file in files: if Path(file).suffix.lower() in audio_extensions: audio_files.append(os.path.join(root, file)) else: if Path(input_file).suffix.lower() in audio_extensions: audio_files.append(input_file) return audio_files def transcribe_with_timestamps(audio_path: str) -> List[dict]: segments, info = model.transcribe( audio_path, beam_size=5, vad_filter=True, vad_parameters=dict(min_silence_duration_ms=500) ) results = [] for segment in segments: results.append({ 'start': segment.start, 'end': segment.end, 'text': segment.text.strip() }) return results def merge_short_segments(segments: List[dict], min_duration: float = 2.0) -> List[dict]: if not segments: return [] merged = [] current = segments[0].copy() for seg in segments[1:]: current_duration = current['end'] - current['start'] if current_duration < min_duration: current['end'] = seg['end'] current['text'] = current['text'] + ' ' + seg['text'] else: merged.append(current) current = seg.copy() merged.append(current) return merged def cut_audio_by_timestamps(audio_path: str, segments: List[dict], output_dir: str, base_name: str) -> List[dict]: audio, sr = librosa.load(audio_path, sr=None) audio_records = [] for idx, seg in enumerate(segments): start_sample = int(seg['start'] * sr) end_sample = int(seg['end'] * sr) audio_segment = audio[start_sample:end_sample] output_filename = f"{base_name}_{idx+1:05d}.wav" output_path = os.path.join(output_dir, output_filename) sf.write(output_path, audio_segment, sr) audio_records.append({ 'audio_path': output_path, 'transcription': seg['text'], 'file_name': f"audio/{output_filename}" }) return audio_records def save_to_parquet(records: List[dict], output_dir: str, max_size_mb: int = 500): df = pd.DataFrame(records) audio_data = [] for path in df['audio_path']: with open(path, 'rb') as f: audio_data.append(f.read()) df['audio'] = audio_data df = df[['audio', 'transcription', 'file_name']] temp_path = os.path.join(output_dir, 'temp.parquet') df.to_parquet(temp_path, engine='pyarrow') file_size_mb = os.path.getsize(temp_path) / (1024 * 1024) os.remove(temp_path) parquet_files = [] if file_size_mb <= max_size_mb: output_path = os.path.join(output_dir, 'train-00000-of-00001.parquet') df.to_parquet(output_path, engine='pyarrow') parquet_files.append(output_path) else: num_parts = int(np.ceil(file_size_mb / max_size_mb)) chunk_size = len(df) // num_parts + 1 for i in range(num_parts): start_idx = i * chunk_size end_idx = min((i + 1) * chunk_size, len(df)) df_chunk = df.iloc[start_idx:end_idx] output_path = os.path.join(output_dir, f'train-{i:05d}-of-{num_parts:05d}.parquet') df_chunk.to_parquet(output_path, engine='pyarrow') parquet_files.append(output_path) return parquet_files def get_sample_files(): sample_dir = "Sample" if not os.path.exists(sample_dir): return [] audio_extensions = {'.wav', '.mp3', '.flac', '.ogg', '.m4a', '.aac', '.zip'} sample_files = [] for file in os.listdir(sample_dir): if Path(file).suffix.lower() in audio_extensions: sample_files.append(os.path.join(sample_dir, file)) return sample_files def process_audio(input_file): if input_file is None: return None, "Vui lòng upload file audio hoặc file zip!" with tempfile.TemporaryDirectory() as temp_dir: extract_dir = os.path.join(temp_dir, 'extracted') audio_output_dir = os.path.join(temp_dir, 'audio') final_output_dir = os.path.join(temp_dir, 'output') os.makedirs(extract_dir, exist_ok=True) os.makedirs(audio_output_dir, exist_ok=True) os.makedirs(final_output_dir, exist_ok=True) audio_files = extract_audio_files(input_file, extract_dir) if not audio_files: return None, "Không tìm thấy file audio nào!" all_records = [] for audio_file in audio_files: base_name = Path(audio_file).stem segments = transcribe_with_timestamps(audio_file) merged_segments = merge_short_segments(segments, min_duration=2.0) records = cut_audio_by_timestamps( audio_file, merged_segments, audio_output_dir, base_name ) all_records.extend(records) parquet_files = save_to_parquet(all_records, final_output_dir) final_audio_dir = os.path.join(final_output_dir, 'audio') shutil.copytree(audio_output_dir, final_audio_dir) zip_path = os.path.join(temp_dir, 'dataset_output.zip') with zipfile.ZipFile(zip_path, 'w', zipfile.ZIP_DEFLATED) as zipf: for root, _, files in os.walk(final_audio_dir): for file in files: file_path = os.path.join(root, file) arcname = os.path.join('audio', file) zipf.write(file_path, arcname) for pq_file in parquet_files: zipf.write(pq_file, os.path.basename(pq_file)) final_zip = os.path.join(tempfile.gettempdir(), 'dataset_output.zip') shutil.copy(zip_path, final_zip) summary = f""" ✅ Xử lý thành công! - Số file audio đầu vào: {len(audio_files)} - Số segment đã tạo: {len(all_records)} - Số file parquet: {len(parquet_files)} - File zip đầu ra: dataset_output.zip """ return final_zip, summary with gr.Blocks(title="Audio Transcription & Dataset Creator") as app: gr.Markdown(""" # 🎙️ Audio Transcription & Dataset Creator Upload file audio hoặc file zip chứa nhiều file audio. Hệ thống sẽ: 1. Transcribe bằng Whisper Large-v3-Turbo 2. Cắt audio theo timestamps (gộp câu ngắn) 3. Tạo dataset Parquet chuẩn với audio bytes """) with gr.Row(): with gr.Column(): input_file = gr.File( label="Upload Audio File hoặc ZIP", file_types=['.wav', '.mp3', '.flac', '.ogg', '.m4a', '.aac', '.zip'] ) sample_files = get_sample_files() if sample_files: gr.Markdown("### 📂 Hoặc chọn file mẫu:") sample_dropdown = gr.Dropdown( choices=sample_files, label="Chọn file mẫu", interactive=True ) load_sample_btn = gr.Button("📥 Load file mẫu", variant="secondary") process_btn = gr.Button("🚀 Bắt đầu xử lý", variant="primary") with gr.Column(): output_file = gr.File(label="📦 Tải về Dataset ZIP") status_text = gr.Textbox(label="📊 Trạng thái", lines=8) if sample_files: def load_sample(sample_path): return sample_path load_sample_btn.click( fn=load_sample, inputs=sample_dropdown, outputs=input_file ) process_btn.click( fn=process_audio, inputs=input_file, outputs=[output_file, status_text] ) gr.Markdown(""" ### 📝 Ghi chú: - Dataset Parquet sẽ được chia nhỏ nếu > 500MB - Cột `audio`: audio bytes (binary) - Cột `transcription`: văn bản transcription - Cột `file_name`: đường dẫn dạng `audio/filename_00001.wav` - Các câu ngắn (< 2s) sẽ được gộp lại - Format tên: train-00000-of-00007.parquet """) if __name__ == "__main__": app.launch()