import os import sys import importlib import json import asyncio import tempfile from datetime import datetime import torch import gradio as gr import pydub import edge_tts import pysrt from pydub import AudioSegment # --- 1) Đảm bảo src/ có trong Python path để import ChatterboxVC --- script_dir = os.path.dirname(os.path.abspath(__file__)) src_path = os.path.join(script_dir, "src") if src_path not in sys.path: sys.path.insert(0, src_path) import chatterbox.vc importlib.reload(chatterbox.vc) from chatterbox.vc import ChatterboxVC # --- 2) Khởi tạo model VC --- DEVICE = "cuda" if torch.cuda.is_available() else "cpu" _vc_model = None def get_vc_model(): global _vc_model if _vc_model is None: print(f"[VC] Đang tải model trên {DEVICE}…") _vc_model = ChatterboxVC.from_pretrained(DEVICE) print("[VC] Model sẵn sàng.") return _vc_model # --- 3) Helper cập nhật log, audio và file-download --- global_log_messages_vc = [] def yield_vc_updates(log_msg=None, audio_data=None, file_list=None, log_append=True): global global_log_messages_vc # cập nhật log if log_msg is not None: prefix = datetime.now().strftime("[%H:%M:%S]") if log_append: global_log_messages_vc.append(f"{prefix} {log_msg}") else: global_log_messages_vc = [f"{prefix} {log_msg}"] log_update = gr.update(value="\n".join(global_log_messages_vc)) # audio output audio_update = gr.update( visible=(audio_data is not None), value=audio_data if audio_data is not None else None ) # file-download output files_update = gr.update( visible=(file_list is not None), value=file_list if file_list is not None else [] ) yield log_update, audio_update, files_update # --- 4) Load voices Edge TTS từ voices.json --- def load_edge_tts_voices(json_path="voices.json"): with open(json_path, "r", encoding="utf-8") as f: voices = json.load(f) display_list, code_map = [], {} for lang, genders in voices.items(): for gender, items in genders.items(): for v in items: disp = f"{lang} - {gender} - {v['display_name']} ({v['voice_code']})" display_list.append(disp) code_map[disp] = v["voice_code"] return display_list, code_map edge_choices, edge_code_map = load_edge_tts_voices() # --- 5) TTS Edge với rate & volume (cho trường hợp nhập text trực tiếp) --- async def _edge_tts_async(text, disp, rate_pct, vol_pct): code = edge_code_map.get(disp) rate_str = f"{rate_pct:+d}%" vol_str = f"{vol_pct:+d}%" out = "temp_edge_tts.wav" await edge_tts.Communicate(text, voice=code, rate=rate_str, volume=vol_str).save(out) return out def run_edge_tts(text, disp, rate_pct, vol_pct): path = asyncio.run(_edge_tts_async(text, disp, rate_pct, vol_pct)) return path, path # --- 6) TTS from SRT sử dụng pysrt + chia nhỏ text nếu quá dài --- async def _tts_save_segment(text: str, voice_code: str, rate_pct: int, vol_pct: int, path: str) -> bool: """ Save một đoạn text thành file audio bằng Edge TTS. Trả về True nếu có audio, False nếu bị NoAudioReceived. """ rate_str = f"{rate_pct:+d}%" vol_str = f"{vol_pct:+d}%" try: await edge_tts.Communicate(text, voice=voice_code, rate=rate_str, volume=vol_str).save(path) return True except edge_tts.exceptions.NoAudioReceived: # segment quá ngắn, bỏ qua return False async def _generate_audio_from_srt( srt_path: str, tmp_dir: str, out_path: str, voice_code: str, rate_pct: int, vol_pct: int ): """ Đọc file .srt, chia nhỏ text nếu >200 ký tự, gọi Edge TTS từng phần, ghép các segment và export thành file WAV. """ subs = pysrt.open(srt_path, encoding='utf-8') segments = [] for i, sub in enumerate(subs): text = sub.text.replace('\n', ' ') # nếu text quá dài, chia nhỏ if len(text) > 200: parts = [text[k:k+200] for k in range(0, len(text), 200)] else: parts = [text] seg = AudioSegment.silent(duration=0) for j, part in enumerate(parts): seg_path = os.path.join(tmp_dir, f"seg_{i}_{j}.wav") ok = await _tts_save_segment(part, voice_code, rate_pct, vol_pct, seg_path) if ok: seg += AudioSegment.from_file(seg_path) segments.append(seg) # ghép tất cả segments và export if segments: combined = segments[0] for seg in segments[1:]: combined += seg combined.export(out_path, format="wav") def synthesize_srt_audio( srt_path: str, disp_voice: str, work_dir: str, rate_pct: int, vol_pct: int ) -> str: """ Wrapper đồng bộ để sinh file WAV từ SRT bằng Edge TTS, trả về đường dẫn file WAV để đưa vào pipeline clone voice. """ # lấy mã giọng từ map voice_code = edge_code_map.get(disp_voice) # tạo tmp dir và định nghĩa output path tmp_dir = tempfile.mkdtemp() out_path = os.path.join(work_dir, "srt_source.wav") # chạy event loop bất đồng bộ loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) loop.run_until_complete( _generate_audio_from_srt( srt_path, tmp_dir, out_path, voice_code, rate_pct, vol_pct ) ) return out_path # --- 7) Voice Conversion chính --- def generate_vc( source_audio_path, target_voice_path, cfg_rate: float, sigma_min: float, batch_mode: bool, batch_parameter: str, batch_values: str ): model = get_vc_model() yield from yield_vc_updates("Khởi tạo chuyển giọng…", log_append=False) # thư mục đầu ra date_folder = datetime.now().strftime("%Y%m%d") work_dir = os.path.join("outputs/vc", date_folder) os.makedirs(work_dir, exist_ok=True) def run_once(src, tgt, rate, sigma): return model.generate(src, target_voice_path=tgt, inference_cfg_rate=rate, sigma_min=sigma) outputs = [] try: if batch_mode: try: vals = [float(v.strip()) for v in batch_values.split(",") if v.strip()] except: raise gr.Error("Batch values phải là số, phân cách bởi dấu phẩy.") yield from yield_vc_updates(f"Chạy batch '{batch_parameter}': {vals}") for idx, v in enumerate(vals, 1): r, s = cfg_rate, sigma_min tag = "" if batch_parameter == "Inference CFG Rate": r, tag = v, f"cfg_{v}" else: s, tag = v, f"sigma_{v}" yield from yield_vc_updates(f" • Mục {idx}/{len(vals)}: {batch_parameter}={v}") wav = run_once(source_audio_path, target_voice_path, r, s) fn = f"{tag}_{idx}.wav" path = os.path.join(work_dir, fn) model.save_wav(wav, path) outputs.append(path) yield from yield_vc_updates(f"Đã lưu: {path}") else: audio = pydub.AudioSegment.from_file(source_audio_path) if len(audio) > 40_000: yield from yield_vc_updates("Audio dài >40s: tách thành đoạn 40s…") chunks = [audio[i:i+40_000] for i in range(0, len(audio), 40_000)] temp_paths = [] for i, chunk in enumerate(chunks): tmp = f"{source_audio_path}_chunk{i}.wav" chunk.export(tmp, format="wav") wav = run_once(tmp, target_voice_path, cfg_rate, sigma_min) outp = os.path.join(work_dir, f"part{i}.wav") model.save_wav(wav, outp) temp_paths.append(outp) os.remove(tmp) yield from yield_vc_updates(f"Xử lý đoạn {i+1}/{len(chunks)}") # ghép lại combined = AudioSegment.empty() for p in temp_paths: combined += AudioSegment.from_file(p) final = os.path.join(work_dir, "combined.wav") combined.export(final, format="wav") outputs.append(final) yield from yield_vc_updates("Chuyển xong.") else: yield from yield_vc_updates("Đang chuyển giọng…") wav = run_once(source_audio_path, target_voice_path, cfg_rate, sigma_min) outp = os.path.join(work_dir, f"LyTranTTS_{datetime.now().strftime('%H%M%S')}.wav") model.save_wav(wav, outp) outputs.append(outp) yield from yield_vc_updates("Hoàn thành.") except Exception as e: yield from yield_vc_updates(f"Lỗi: {e}") raise # trả về audio đầu tiên và danh sách file để download first = outputs[0] if outputs else None yield from yield_vc_updates(log_msg=None, audio_data=first, file_list=outputs) # --- 8) Wrapper tổng hợp --- def run_vc_from_srt_or_file( use_srt: bool, srt_file, srt_voice, srt_rate, srt_vol, edge_text, edge_voice, edge_rate, edge_vol, src_audio, tgt_audio, cfg_rate, sigma_min, batch_mode, batch_parameter, batch_values ): yield from yield_vc_updates("Bắt đầu…", log_append=False) date_folder = datetime.now().strftime("%Y%m%d") work_dir = os.path.join("outputs/vc", date_folder) os.makedirs(work_dir, exist_ok=True) if use_srt: yield from yield_vc_updates("Sinh audio từ SRT…") source = synthesize_srt_audio( srt_file.name, srt_voice, work_dir, rate_pct=srt_rate, vol_pct=srt_vol ) elif edge_text and edge_voice: yield from yield_vc_updates("Sinh audio từ Edge TTS…") tmp, _ = run_edge_tts(edge_text, edge_voice, edge_rate, edge_vol) source = tmp else: source = src_audio yield from generate_vc( source, tgt_audio, cfg_rate, sigma_min, batch_mode, batch_parameter, batch_values ) # --- 9) Build Gradio UI --- with gr.Blocks(title="Chuyển Giọng Nói AI") as demo: gr.Markdown("## 📣 Chuyển Giọng Nói AI") gr.Markdown("> Tác giả: **Lý Trần**") with gr.Row(): with gr.Column(): # SRT use_srt = gr.Checkbox(label="Sử dụng file SRT làm nguồn?", value=False) srt_file = gr.File(file_types=[".srt"], label="Tải lên file .srt", visible=False) srt_voice = gr.Dropdown(choices=edge_choices, label="Giọng Edge TTS (SRT)", visible=False) srt_rate = gr.Slider(-100, 100, value=0, step=1, label="Tốc độ SRT (% chuẩn)", visible=False) srt_vol = gr.Slider(-100, 100, value=0, step=1, label="Âm lượng SRT (% chuẩn)", visible=False) # Edge TTS use_edge = gr.Checkbox(label="Tạo nguồn qua Edge TTS?", value=False) edge_text = gr.Textbox(label="Văn bản cho Edge TTS", visible=False) edge_voice = gr.Dropdown(choices=edge_choices, label="Giọng Edge TTS", visible=False) edge_rate = gr.Slider(-100, 100, value=0, step=1, label="Tốc độ Edge (% chuẩn)", visible=False) edge_vol = gr.Slider(-100, 100, value=0, step=1, label="Âm lượng Edge (% chuẩn)", visible=False) gen_edge_btn = gr.Button("🗣️ Tạo Edge TTS", visible=False) edge_audio = gr.Audio(label="Nguồn Edge TTS", type="filepath", visible=False) # Nguồn thủ công src_audio = gr.Audio(sources=["upload","microphone"], type="filepath", label="Tải lên / Ghi âm nguồn") # Giọng tham chiếu gr.Markdown("### Giọng tham chiếu (mục tiêu)") tgt_audio = gr.Audio(sources=["upload","microphone"], type="filepath", label="Tải lên / Ghi âm giọng mục tiêu") # Tham số VC gr.Markdown("### Tham số chuyển giọng") cfg_slider = gr.Slider(0.0, 30.0, value=0.5, step=0.1, label="CFG Rate") sigma_input = gr.Number(1e-6, label="Sigma Min", minimum=1e-7, maximum=1e-5, step=1e-7) # Batch sweep with gr.Accordion("Tùy chọn Batch Sweep", open=False): batch_chk = gr.Checkbox(label="Kích hoạt Batch Sweep", value=False) batch_param = gr.Dropdown(choices=["Inference CFG Rate","Sigma Min"], label="Tham số thay đổi") batch_vals = gr.Textbox(placeholder="ví dụ: 0.5,1.0,2.0", label="Giá trị phân cách dấu phẩy") run_btn = gr.Button("🚀 Chuyển giọng") with gr.Column(): gr.Markdown("### Nhật ký") log_box = gr.Textbox(interactive=False, lines=12) gr.Markdown("### Kết quả") out_audio = gr.Audio(label="Âm thanh kết quả", type="filepath", visible=False) out_files = gr.Files(label="Tải xuống file đầu ra", visible=False) # Toggle SRT def toggle_srt(v): return ( gr.update(visible=v), # srt_file gr.update(visible=v), # srt_voice gr.update(visible=v), # srt_rate gr.update(visible=v), # srt_vol gr.update(visible=not v),# use_edge gr.update(visible=not v),# edge_text gr.update(visible=not v),# edge_voice gr.update(visible=not v),# edge_rate gr.update(visible=not v),# edge_vol gr.update(visible=not v),# gen_edge_btn gr.update(visible=not v),# edge_audio gr.update(visible=not v) # src_audio ) use_srt.change( fn=toggle_srt, inputs=[use_srt], outputs=[ srt_file, srt_voice, srt_rate, srt_vol, use_edge, edge_text, edge_voice, edge_rate, edge_vol, gen_edge_btn, edge_audio, src_audio ] ) # Toggle Edge TTS def toggle_edge(v): return ( gr.update(visible=v), # edge_text gr.update(visible=v), # edge_voice gr.update(visible=v), # edge_rate gr.update(visible=v), # edge_vol gr.update(visible=v), # gen_edge_btn gr.update(visible=v), # edge_audio gr.update(visible=not v) # src_audio ) use_edge.change( fn=toggle_edge, inputs=[use_edge], outputs=[edge_text, edge_voice, edge_rate, edge_vol, gen_edge_btn, edge_audio, src_audio] ) # Sinh Edge TTS gen_edge_btn.click( fn=run_edge_tts, inputs=[edge_text, edge_voice, edge_rate, edge_vol], outputs=[edge_audio, src_audio] ) # Chạy VC run_btn.click( fn=run_vc_from_srt_or_file, inputs=[ use_srt, srt_file, srt_voice, srt_rate, srt_vol, edge_text, edge_voice, edge_rate, edge_vol, src_audio, tgt_audio, cfg_slider, sigma_input, batch_chk, batch_param, batch_vals ], outputs=[log_box, out_audio, out_files], show_progress="minimal" ) if __name__ == "__main__": demo.launch(share=True)