TTSDatasets / app.py
Pragmaticl's picture
Update app.py
210017d verified
import gradio as gr
import os
import zipfile
import tempfile
from pathlib import Path
from faster_whisper import WhisperModel
import librosa
import soundfile as sf
import pandas as pd
import numpy as np
from typing import List, Tuple
import shutil
model = WhisperModel("large-v3-turbo", device="cpu", compute_type="int8")
def extract_audio_files(input_file: str, temp_dir: str) -> List[str]:
audio_files = []
audio_extensions = {'.wav', '.mp3', '.flac', '.ogg', '.m4a', '.aac'}
if input_file.endswith('.zip'):
with zipfile.ZipFile(input_file, 'r') as zip_ref:
zip_ref.extractall(temp_dir)
for root, _, files in os.walk(temp_dir):
for file in files:
if Path(file).suffix.lower() in audio_extensions:
audio_files.append(os.path.join(root, file))
else:
if Path(input_file).suffix.lower() in audio_extensions:
audio_files.append(input_file)
return audio_files
def transcribe_with_timestamps(audio_path: str) -> List[dict]:
segments, info = model.transcribe(
audio_path,
beam_size=5,
vad_filter=True,
vad_parameters=dict(min_silence_duration_ms=500)
)
results = []
for segment in segments:
results.append({
'start': segment.start,
'end': segment.end,
'text': segment.text.strip()
})
return results
def merge_short_segments(segments: List[dict], min_duration: float = 2.0) -> List[dict]:
if not segments:
return []
merged = []
current = segments[0].copy()
for seg in segments[1:]:
current_duration = current['end'] - current['start']
if current_duration < min_duration:
current['end'] = seg['end']
current['text'] = current['text'] + ' ' + seg['text']
else:
merged.append(current)
current = seg.copy()
merged.append(current)
return merged
def cut_audio_by_timestamps(audio_path: str, segments: List[dict], output_dir: str, base_name: str) -> List[dict]:
audio, sr = librosa.load(audio_path, sr=None)
audio_records = []
for idx, seg in enumerate(segments):
start_sample = int(seg['start'] * sr)
end_sample = int(seg['end'] * sr)
audio_segment = audio[start_sample:end_sample]
output_filename = f"{base_name}_{idx+1:05d}.wav"
output_path = os.path.join(output_dir, output_filename)
sf.write(output_path, audio_segment, sr)
audio_records.append({
'audio_path': output_path,
'transcription': seg['text'],
'file_name': f"audio/{output_filename}"
})
return audio_records
def save_to_parquet(records: List[dict], output_dir: str, max_size_mb: int = 500):
df = pd.DataFrame(records)
audio_data = []
for path in df['audio_path']:
with open(path, 'rb') as f:
audio_data.append(f.read())
df['audio'] = audio_data
df = df[['audio', 'transcription', 'file_name']]
temp_path = os.path.join(output_dir, 'temp.parquet')
df.to_parquet(temp_path, engine='pyarrow')
file_size_mb = os.path.getsize(temp_path) / (1024 * 1024)
os.remove(temp_path)
parquet_files = []
if file_size_mb <= max_size_mb:
output_path = os.path.join(output_dir, 'train-00000-of-00001.parquet')
df.to_parquet(output_path, engine='pyarrow')
parquet_files.append(output_path)
else:
num_parts = int(np.ceil(file_size_mb / max_size_mb))
chunk_size = len(df) // num_parts + 1
for i in range(num_parts):
start_idx = i * chunk_size
end_idx = min((i + 1) * chunk_size, len(df))
df_chunk = df.iloc[start_idx:end_idx]
output_path = os.path.join(output_dir, f'train-{i:05d}-of-{num_parts:05d}.parquet')
df_chunk.to_parquet(output_path, engine='pyarrow')
parquet_files.append(output_path)
return parquet_files
def get_sample_files():
sample_dir = "Sample"
if not os.path.exists(sample_dir):
return []
audio_extensions = {'.wav', '.mp3', '.flac', '.ogg', '.m4a', '.aac', '.zip'}
sample_files = []
for file in os.listdir(sample_dir):
if Path(file).suffix.lower() in audio_extensions:
sample_files.append(os.path.join(sample_dir, file))
return sample_files
def process_audio(input_file):
if input_file is None:
return None, "Vui lòng upload file audio hoặc file zip!"
with tempfile.TemporaryDirectory() as temp_dir:
extract_dir = os.path.join(temp_dir, 'extracted')
audio_output_dir = os.path.join(temp_dir, 'audio')
final_output_dir = os.path.join(temp_dir, 'output')
os.makedirs(extract_dir, exist_ok=True)
os.makedirs(audio_output_dir, exist_ok=True)
os.makedirs(final_output_dir, exist_ok=True)
audio_files = extract_audio_files(input_file, extract_dir)
if not audio_files:
return None, "Không tìm thấy file audio nào!"
all_records = []
for audio_file in audio_files:
base_name = Path(audio_file).stem
segments = transcribe_with_timestamps(audio_file)
merged_segments = merge_short_segments(segments, min_duration=2.0)
records = cut_audio_by_timestamps(
audio_file,
merged_segments,
audio_output_dir,
base_name
)
all_records.extend(records)
parquet_files = save_to_parquet(all_records, final_output_dir)
final_audio_dir = os.path.join(final_output_dir, 'audio')
shutil.copytree(audio_output_dir, final_audio_dir)
zip_path = os.path.join(temp_dir, 'dataset_output.zip')
with zipfile.ZipFile(zip_path, 'w', zipfile.ZIP_DEFLATED) as zipf:
for root, _, files in os.walk(final_audio_dir):
for file in files:
file_path = os.path.join(root, file)
arcname = os.path.join('audio', file)
zipf.write(file_path, arcname)
for pq_file in parquet_files:
zipf.write(pq_file, os.path.basename(pq_file))
final_zip = os.path.join(tempfile.gettempdir(), 'dataset_output.zip')
shutil.copy(zip_path, final_zip)
summary = f"""
✅ Xử lý thành công!
- Số file audio đầu vào: {len(audio_files)}
- Số segment đã tạo: {len(all_records)}
- Số file parquet: {len(parquet_files)}
- File zip đầu ra: dataset_output.zip
"""
return final_zip, summary
with gr.Blocks(title="Audio Transcription & Dataset Creator") as app:
gr.Markdown("""
# 🎙️ Audio Transcription & Dataset Creator
Upload file audio hoặc file zip chứa nhiều file audio.
Hệ thống sẽ:
1. Transcribe bằng Whisper Large-v3-Turbo
2. Cắt audio theo timestamps (gộp câu ngắn)
3. Tạo dataset Parquet chuẩn với audio bytes
""")
with gr.Row():
with gr.Column():
input_file = gr.File(
label="Upload Audio File hoặc ZIP",
file_types=['.wav', '.mp3', '.flac', '.ogg', '.m4a', '.aac', '.zip']
)
sample_files = get_sample_files()
if sample_files:
gr.Markdown("### 📂 Hoặc chọn file mẫu:")
sample_dropdown = gr.Dropdown(
choices=sample_files,
label="Chọn file mẫu",
interactive=True
)
load_sample_btn = gr.Button("📥 Load file mẫu", variant="secondary")
process_btn = gr.Button("🚀 Bắt đầu xử lý", variant="primary")
with gr.Column():
output_file = gr.File(label="📦 Tải về Dataset ZIP")
status_text = gr.Textbox(label="📊 Trạng thái", lines=8)
if sample_files:
def load_sample(sample_path):
return sample_path
load_sample_btn.click(
fn=load_sample,
inputs=sample_dropdown,
outputs=input_file
)
process_btn.click(
fn=process_audio,
inputs=input_file,
outputs=[output_file, status_text]
)
gr.Markdown("""
### 📝 Ghi chú:
- Dataset Parquet sẽ được chia nhỏ nếu > 500MB
- Cột `audio`: audio bytes (binary)
- Cột `transcription`: văn bản transcription
- Cột `file_name`: đường dẫn dạng `audio/filename_00001.wav`
- Các câu ngắn (< 2s) sẽ được gộp lại
- Format tên: train-00000-of-00007.parquet
""")
if __name__ == "__main__":
app.launch()