import gradio as gr import soundfile as sf import tempfile import os import time import numpy as np import librosa import re import json import shutil from pathlib import Path from datetime import datetime from llama_cpp import Llama from neucodec import NeuCodecOnnxDecoder import torch from utils.phonemize_text import phonemize_with_dict import threading from queue import Queue print("⏳ Đang khởi động VieNeu-TTS...") # --- CONSTANTS --- MAX_CHARS_PER_CHUNK = 256 SAMPLE_RATE = 24000 DEVICE_INFO = "Q4 GGUF (llama-cpp) + ONNX Codec" # Thư mục lưu lịch sử - sử dụng thư mục tạm nếu không có quyền ghi try: HISTORY_DIR = "./tts_history" os.makedirs(HISTORY_DIR, exist_ok=True) # Test write permission test_file = os.path.join(HISTORY_DIR, ".test") with open(test_file, 'w') as f: f.write("test") os.remove(test_file) except (PermissionError, OSError): # Fallback to temp directory HISTORY_DIR = os.path.join(tempfile.gettempdir(), "vieneu_tts_history") os.makedirs(HISTORY_DIR, exist_ok=True) print(f"⚠️ Không có quyền ghi, sử dụng thư mục tạm: {HISTORY_DIR}") HISTORY_JSON = os.path.join(HISTORY_DIR, "history.json") # Đường dẫn model BACKBONE_REPO = "pnnbao-ump/VieNeu-TTS-q8-gguf" CODEC_REPO = "neuphonic/neucodec-onnx-decoder" # Giọng mẫu VOICE_SAMPLES = { "Tuyên (nam miền Bắc)": { "audio": "./sample/Tuyên (nam miền Bắc).wav", "text": "./sample/Tuyên (nam miền Bắc).txt", "codes": "./sample/Tuyên (nam miền Bắc).pt" }, "Vĩnh (nam miền Nam)": { "audio": "./sample/Vĩnh (nam miền Nam).wav", "text": "./sample/Vĩnh (nam miền Nam).txt", "codes": "./sample/Vĩnh (nam miền Nam).pt" }, "Bình (nam miền Bắc)": { "audio": "./sample/Bình (nam miền Bắc).wav", "text": "./sample/Bình (nam miền Bắc).txt", "codes": "./sample/Bình (nam miền Bắc).pt" }, "Nguyên (nam miền Nam)": { "audio": "./sample/Nguyên (nam miền Nam).wav", "text": "./sample/Nguyên (nam miền Nam).txt", "codes": "./sample/Nguyên (nam miền Nam).pt" }, "Sơn (nam miền Nam)": { "audio": "./sample/Sơn (nam miền Nam).wav", "text": "./sample/Sơn (nam miền Nam).txt", "codes": "./sample/Sơn (nam miền Nam).pt" }, "Đoan (nữ miền Nam)": { "audio": "./sample/Đoan (nữ miền Nam).wav", "text": "./sample/Đoan (nữ miền Nam).txt", "codes": "./sample/Đoan (nữ miền Nam).pt" }, "Ngọc (nữ miền Bắc)": { "audio": "./sample/Ngọc (nữ miền Bắc).wav", "text": "./sample/Ngọc (nữ miền Bắc).txt", "codes": "./sample/Ngọc (nữ miền Bắc).pt" }, "Ly (nữ miền Bắc)": { "audio": "./sample/Ly (nữ miền Bắc).wav", "text": "./sample/Ly (nữ miền Bắc).txt", "codes": "./sample/Ly (nữ miền Bắc).pt" }, "Dung (nữ miền Nam)": { "audio": "./sample/Dung (nữ miền Nam).wav", "text": "./sample/Dung (nữ miền Nam).txt", "codes": "./sample/Dung (nữ miền Nam).pt" } } # --- HISTORY MANAGEMENT --- def load_history(): """Tải lịch sử từ file JSON""" if os.path.exists(HISTORY_JSON): try: with open(HISTORY_JSON, 'r', encoding='utf-8') as f: return json.load(f) except: return [] return [] def save_history(history): """Lưu lịch sử vào file JSON""" with open(HISTORY_JSON, 'w', encoding='utf-8') as f: json.dump(history, f, ensure_ascii=False, indent=2) def add_to_history(text, voice, audio_path, duration, status): """Thêm một bản ghi vào lịch sử""" try: history = load_history() # Tạo tên file duy nhất timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") filename = f"tts_{timestamp}.wav" permanent_path = os.path.join(HISTORY_DIR, filename) # Copy file audio vào thư mục lịch sử if audio_path and os.path.exists(audio_path): try: shutil.copy2(audio_path, permanent_path) except Exception as e: print(f"⚠️ Không thể copy file audio: {e}") permanent_path = audio_path # Dùng file tạm nếu không copy được record = { "id": timestamp, "timestamp": datetime.now().strftime("%Y-%m-%d %H:%M:%S"), "text": text[:100] + "..." if len(text) > 100 else text, "full_text": text, "voice": voice, "audio_path": permanent_path, "duration": duration, "status": status } history.insert(0, record) # Thêm vào đầu danh sách # Giới hạn lịch sử tối đa 100 bản ghi if len(history) > 100: # Xóa file audio cũ old_record = history.pop() try: if os.path.exists(old_record['audio_path']): os.remove(old_record['audio_path']) except: pass save_history(history) return permanent_path except Exception as e: print(f"⚠️ Lỗi khi lưu lịch sử: {e}") return audio_path if audio_path else None def get_history_list(): """Lấy danh sách lịch sử để hiển thị""" history = load_history() if not history: return "Chưa có lịch sử tổng hợp nào." output = [] for i, record in enumerate(history[:50], 1): # Hiển thị 50 bản ghi gần nhất output.append(f"**{i}. [{record['timestamp']}]** - {record['voice']}") output.append(f" 📝 {record['text']}") output.append(f" ⏱️ Thời lượng: {record['duration']:.2f}s | {record['status']}") output.append("") return "\n".join(output) def delete_history_item(item_id): """Xóa một item khỏi lịch sử""" history = load_history() history = [h for h in history if h['id'] != item_id] save_history(history) # --- BACKGROUND PROCESSING QUEUE --- processing_queue = Queue() is_processing = False def background_processor(): """Xử lý queue tổng hợp trong background""" global is_processing while True: task = processing_queue.get() if task is None: break is_processing = True text, voice = task try: print(f"[Background] Bắt đầu tổng hợp: {text[:50]}...") result = synthesize_speech_internal(text, voice) print(f"[Background] Hoàn thành: {result}") except Exception as e: print(f"[Background] Lỗi: {e}") is_processing = False processing_queue.task_done() # Khởi động background thread bg_thread = threading.Thread(target=background_processor, daemon=True) bg_thread.start() def split_text_into_chunks(text, max_chars=256): """Chia text thành chunks""" sentences = re.split(r'([.!?,;])', text) chunks = [] current = "" for i in range(0, len(sentences), 2): sentence = sentences[i] punct = sentences[i+1] if i+1 < len(sentences) else "" segment = sentence + punct if len(current) + len(segment) <= max_chars: current += segment else: if current: chunks.append(current.strip()) current = segment if current: chunks.append(current.strip()) return chunks if chunks else [text] def decode_audio(codes_str, codec): """Decode speech tokens to audio""" speech_ids = [int(num) for num in re.findall(r"<\|speech_(\d+)\|>", codes_str)] if len(speech_ids) == 0: print("Không tìm thấy mã giọng nói hợp lệ.") return np.array([], dtype=np.float32) codes = np.array(speech_ids, dtype=np.int32)[np.newaxis, np.newaxis, :] recon = codec.decode_code(codes) return recon[0, 0, :] # --- MODEL LOADING --- print("📦 Đang tải model Q4 GGUF và Codec ONNX...") model_loaded = False backbone = None codec = None try: backbone = Llama.from_pretrained( repo_id=BACKBONE_REPO, filename="*.gguf", verbose=False, n_gpu_layers=-1, n_ctx=2048, mlock=True, flash_attn=True, ) codec = NeuCodecOnnxDecoder.from_pretrained(CODEC_REPO) print("✅ Model đã tải thành công!") model_loaded = True except Exception as e: import traceback traceback.print_exc() print(f"❌ Lỗi khi tải model: {e}") model_loaded = False # --- SYNTHESIS FUNCTION (Internal) --- def synthesize_speech_internal(text, voice_choice): """Internal synthesis function không phụ thuộc UI""" global backbone, codec, model_loaded if not model_loaded: return None if not text or text.strip() == "": return None if voice_choice not in VOICE_SAMPLES: return None raw_text = text.strip() # Load reference text ref_text_path = VOICE_SAMPLES[voice_choice]["text"] try: with open(ref_text_path, "r", encoding="utf-8") as f: ref_text_raw = f.read() except Exception as e: print(f"❌ Lỗi đọc file text mẫu: {e}") return None # Tải codes ref_codes = None codes_path = VOICE_SAMPLES[voice_choice]["codes"] try: ref_codes_tensor = torch.load(codes_path, map_location="cpu") if isinstance(ref_codes_tensor, torch.Tensor): ref_codes = ref_codes_tensor.cpu().numpy() else: ref_codes = np.array(ref_codes_tensor) except Exception as e: print(f"❌ Lỗi khi tải codes: {e}") return None if ref_codes is None or len(ref_codes) == 0: return None # Split text text_chunks = split_text_into_chunks(raw_text, max_chars=MAX_CHARS_PER_CHUNK) all_audio_segments = [] silence_pad = np.zeros(int(SAMPLE_RATE * 0.15), dtype=np.float32) start_time = time.time() try: for i, chunk in enumerate(text_chunks): # Phonemize ref_text_phoneme = phonemize_with_dict(ref_text_raw) input_text_phoneme = phonemize_with_dict(chunk) # Create prompt codes_str = "".join([f"<|speech_{idx}|>" for idx in ref_codes]) prompt = ( f"user: Convert the text to speech:<|TEXT_PROMPT_START|>{ref_text_phoneme} {input_text_phoneme}" f"<|TEXT_PROMPT_END|>\nassistant:<|SPEECH_GENERATION_START|>{codes_str}" ) # Generate output = backbone( prompt, max_tokens=2048, temperature=1.0, top_k=50, stop=["<|SPEECH_GENERATION_END|>"], ) output_str = output["choices"][0]["text"] # Decode - CHỈ DECODE PHẦN OUTPUT (không bao gồm reference codes) chunk_wav = decode_audio(output_str, codec) if chunk_wav is not None and len(chunk_wav) > 0: all_audio_segments.append(chunk_wav) if i < len(text_chunks) - 1: all_audio_segments.append(silence_pad) if not all_audio_segments: return None final_wav = np.concatenate(all_audio_segments) with tempfile.NamedTemporaryFile(delete=False, suffix=".wav") as tmp: sf.write(tmp.name, final_wav, SAMPLE_RATE) output_path = tmp.name process_time = time.time() - start_time # Lưu vào lịch sử permanent_path = add_to_history(raw_text, voice_choice, output_path, process_time, "Thành công") return permanent_path except Exception as e: import traceback traceback.print_exc() return None # --- SYNTHESIS FUNCTION (UI) --- def synthesize_speech(text, voice_choice): """Main synthesis function với UI feedback""" global backbone, codec, model_loaded if not model_loaded: yield None, "⚠️ Model chưa tải. Vui lòng kiểm tra lỗi console!" return if not text or text.strip() == "": yield None, "⚠️ Vui lòng nhập văn bản!" return if voice_choice not in VOICE_SAMPLES: yield None, "⚠️ Vui lòng chọn giọng mẫu." return raw_text = text.strip() # Load reference text ref_text_path = VOICE_SAMPLES[voice_choice]["text"] try: with open(ref_text_path, "r", encoding="utf-8") as f: ref_text_raw = f.read() except Exception as e: yield None, f"❌ Lỗi đọc file text mẫu: {e}" return yield None, "📄 Đang xử lý Reference..." # Tải codes ref_codes = None codes_path = VOICE_SAMPLES[voice_choice]["codes"] try: ref_codes_tensor = torch.load(codes_path, map_location="cpu") if isinstance(ref_codes_tensor, torch.Tensor): ref_codes = ref_codes_tensor.cpu().numpy() else: ref_codes = np.array(ref_codes_tensor) except Exception as e: yield None, f"❌ Lỗi khi tải codes: {e}" return if ref_codes is None or len(ref_codes) == 0: yield None, "❌ Codes tham chiếu không hợp lệ." return # Split text text_chunks = split_text_into_chunks(raw_text, max_chars=MAX_CHARS_PER_CHUNK) total_chunks = len(text_chunks) yield None, f"🚀 Bắt đầu tổng hợp ({total_chunks} đoạn)..." all_audio_segments = [] silence_pad = np.zeros(int(SAMPLE_RATE * 0.15), dtype=np.float32) start_time = time.time() try: for i, chunk in enumerate(text_chunks): yield None, f"⏳ Đang xử lý đoạn {i+1}/{total_chunks}..." # Phonemize ref_text_phoneme = phonemize_with_dict(ref_text_raw) input_text_phoneme = phonemize_with_dict(chunk) # Create prompt codes_str = "".join([f"<|speech_{idx}|>" for idx in ref_codes]) prompt = ( f"user: Convert the text to speech:<|TEXT_PROMPT_START|>{ref_text_phoneme} {input_text_phoneme}" f"<|TEXT_PROMPT_END|>\nassistant:<|SPEECH_GENERATION_START|>{codes_str}" ) # Generate output = backbone( prompt, max_tokens=2048, temperature=1.0, top_k=50, stop=["<|SPEECH_GENERATION_END|>"], ) output_str = output["choices"][0]["text"] # Decode - CHỈ DECODE PHẦN OUTPUT (không bao gồm reference codes) chunk_wav = decode_audio(output_str, codec) if chunk_wav is not None and len(chunk_wav) > 0: all_audio_segments.append(chunk_wav) if i < total_chunks - 1: all_audio_segments.append(silence_pad) if not all_audio_segments: yield None, "❌ Không sinh được audio nào." return yield None, "💾 Đang ghép file và lưu..." final_wav = np.concatenate(all_audio_segments) with tempfile.NamedTemporaryFile(delete=False, suffix=".wav") as tmp: sf.write(tmp.name, final_wav, SAMPLE_RATE) output_path = tmp.name process_time = time.time() - start_time # Lưu vào lịch sử permanent_path = add_to_history(raw_text, voice_choice, output_path, process_time, "Thành công") yield permanent_path, f"✅ Hoàn tất! (Tổng thời gian: {process_time:.2f}s)" except Exception as e: import traceback traceback.print_exc() add_to_history(raw_text, voice_choice, None, 0, f"Lỗi: {str(e)}") yield None, f"❌ Lỗi tổng hợp: {str(e)}" def refresh_history(): """Làm mới danh sách lịch sử""" return get_history_list() def load_history_item(history_index): """Tải một item từ lịch sử""" if not history_index: return None, "", "", "" try: index = int(history_index.split(".")[0]) - 1 history = load_history() if 0 <= index < len(history): record = history[index] return ( record['audio_path'] if os.path.exists(record['audio_path']) else None, record['full_text'], record['voice'], f"📅 {record['timestamp']} | ⏱️ {record['duration']:.2f}s" ) except: pass return None, "", "", "" # --- UI SETUP --- theme = gr.themes.Ocean( primary_hue="indigo", secondary_hue="cyan", neutral_hue="slate", font=[gr.themes.GoogleFont('Inter'), 'ui-sans-serif', 'system-ui'], ).set( button_primary_background_fill="linear-gradient(90deg, #6366f1 0%, #0ea5e9 100%)", button_primary_background_fill_hover="linear-gradient(90deg, #4f46e5 0%, #0284c7 100%)", ) css = """ .container { max-width: 1400px; margin: auto; } .header-box { text-align: center; margin-bottom: 25px; padding: 25px; background: linear-gradient(135deg, #0f172a 0%, #1e293b 100%); border-radius: 12px; color: white; } .header-title { font-size: 2.5rem; font-weight: 800; } .gradient-text { background: -webkit-linear-gradient(45deg, #60A5FA, #22D3EE); -webkit-background-clip: text; -webkit-text-fill-color: transparent; } .status-box { font-weight: bold; text-align: center; border: none; background: transparent; } .model-card-content { display: flex; flex-wrap: wrap; justify-content: center; align-items: center; gap: 15px; font-size: 0.9rem; color: #cbd5e1; } .model-card-item { display: flex; align-items: center; justify-content: center; gap: 6px; color: #94a3b8; } .model-card-link { color: #3b82f6; text-decoration: none; font-weight: 500; transition: color 0.2s; } .model-card-link:hover { color: #2563eb; text-decoration: underline; } .history-list { max-height: 600px; overflow-y: auto; padding: 15px; background: #f8fafc; border-radius: 8px; } """ EXAMPLES_LIST = [ ["Về miền Tây không chỉ để ngắm nhìn sông nước hữu tình, mà còn để cảm nhận tấm chân tình của người dân nơi đây.", "Vĩnh (nam miền Nam)"], ["Hà Nội những ngày vào thu mang một vẻ đẹp trầm mặc và cổ kính đến lạ thường.", "Bình (nam miền Bắc)"], ["Thành phố Hồ Chí Minh luôn chuyển mình không ngừng với nhịp sống hối hả, năng động.", "Dung (nữ miền Nam)"], ] initial_status = f"✅ Model đã tải thành công! (Chạy trên **{DEVICE_INFO}**). Hỗ trợ xử lý background và lưu lịch sử." if model_loaded else "❌ Lỗi khi tải model." with gr.Blocks(title="VieNeu-TTS", theme=theme, css=css) as demo: with gr.Column(elem_classes="container"): gr.HTML(f"""
Chế độ: {DEVICE_INFO} | Background Processing ✅ | History ✅
📁 Lịch sử lưu tại: {HISTORY_DIR}