Arrcttacsrks's picture
Upload app-12.py
9f84367 verified
import gradio as gr
import soundfile as sf
import tempfile
import os
import time
import numpy as np
import librosa
import re
import json
import shutil
from pathlib import Path
from datetime import datetime
from llama_cpp import Llama
from neucodec import NeuCodecOnnxDecoder
import torch
from utils.phonemize_text import phonemize_with_dict
import threading
from queue import Queue
print("⏳ Đang khởi động VieNeu-TTS...")
# --- CONSTANTS ---
MAX_CHARS_PER_CHUNK = 256
SAMPLE_RATE = 24000
DEVICE_INFO = "Q4 GGUF (llama-cpp) + ONNX Codec"
# Thư mục lưu lịch sử - sử dụng thư mục tạm nếu không có quyền ghi
try:
HISTORY_DIR = "./tts_history"
os.makedirs(HISTORY_DIR, exist_ok=True)
# Test write permission
test_file = os.path.join(HISTORY_DIR, ".test")
with open(test_file, 'w') as f:
f.write("test")
os.remove(test_file)
except (PermissionError, OSError):
# Fallback to temp directory
HISTORY_DIR = os.path.join(tempfile.gettempdir(), "vieneu_tts_history")
os.makedirs(HISTORY_DIR, exist_ok=True)
print(f"⚠️ Không có quyền ghi, sử dụng thư mục tạm: {HISTORY_DIR}")
HISTORY_JSON = os.path.join(HISTORY_DIR, "history.json")
# Đường dẫn model
BACKBONE_REPO = "pnnbao-ump/VieNeu-TTS-q8-gguf"
CODEC_REPO = "neuphonic/neucodec-onnx-decoder"
# Giọng mẫu
VOICE_SAMPLES = {
"Tuyên (nam miền Bắc)": {
"audio": "./sample/Tuyên (nam miền Bắc).wav",
"text": "./sample/Tuyên (nam miền Bắc).txt",
"codes": "./sample/Tuyên (nam miền Bắc).pt"
},
"Vĩnh (nam miền Nam)": {
"audio": "./sample/Vĩnh (nam miền Nam).wav",
"text": "./sample/Vĩnh (nam miền Nam).txt",
"codes": "./sample/Vĩnh (nam miền Nam).pt"
},
"Bình (nam miền Bắc)": {
"audio": "./sample/Bình (nam miền Bắc).wav",
"text": "./sample/Bình (nam miền Bắc).txt",
"codes": "./sample/Bình (nam miền Bắc).pt"
},
"Nguyên (nam miền Nam)": {
"audio": "./sample/Nguyên (nam miền Nam).wav",
"text": "./sample/Nguyên (nam miền Nam).txt",
"codes": "./sample/Nguyên (nam miền Nam).pt"
},
"Sơn (nam miền Nam)": {
"audio": "./sample/Sơn (nam miền Nam).wav",
"text": "./sample/Sơn (nam miền Nam).txt",
"codes": "./sample/Sơn (nam miền Nam).pt"
},
"Đoan (nữ miền Nam)": {
"audio": "./sample/Đoan (nữ miền Nam).wav",
"text": "./sample/Đoan (nữ miền Nam).txt",
"codes": "./sample/Đoan (nữ miền Nam).pt"
},
"Ngọc (nữ miền Bắc)": {
"audio": "./sample/Ngọc (nữ miền Bắc).wav",
"text": "./sample/Ngọc (nữ miền Bắc).txt",
"codes": "./sample/Ngọc (nữ miền Bắc).pt"
},
"Ly (nữ miền Bắc)": {
"audio": "./sample/Ly (nữ miền Bắc).wav",
"text": "./sample/Ly (nữ miền Bắc).txt",
"codes": "./sample/Ly (nữ miền Bắc).pt"
},
"Dung (nữ miền Nam)": {
"audio": "./sample/Dung (nữ miền Nam).wav",
"text": "./sample/Dung (nữ miền Nam).txt",
"codes": "./sample/Dung (nữ miền Nam).pt"
}
}
# --- HISTORY MANAGEMENT ---
def load_history():
"""Tải lịch sử từ file JSON"""
if os.path.exists(HISTORY_JSON):
try:
with open(HISTORY_JSON, 'r', encoding='utf-8') as f:
return json.load(f)
except:
return []
return []
def save_history(history):
"""Lưu lịch sử vào file JSON"""
with open(HISTORY_JSON, 'w', encoding='utf-8') as f:
json.dump(history, f, ensure_ascii=False, indent=2)
def add_to_history(text, voice, audio_path, duration, status):
"""Thêm một bản ghi vào lịch sử"""
try:
history = load_history()
# Tạo tên file duy nhất
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
filename = f"tts_{timestamp}.wav"
permanent_path = os.path.join(HISTORY_DIR, filename)
# Copy file audio vào thư mục lịch sử
if audio_path and os.path.exists(audio_path):
try:
shutil.copy2(audio_path, permanent_path)
except Exception as e:
print(f"⚠️ Không thể copy file audio: {e}")
permanent_path = audio_path # Dùng file tạm nếu không copy được
record = {
"id": timestamp,
"timestamp": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
"text": text[:100] + "..." if len(text) > 100 else text,
"full_text": text,
"voice": voice,
"audio_path": permanent_path,
"duration": duration,
"status": status
}
history.insert(0, record) # Thêm vào đầu danh sách
# Giới hạn lịch sử tối đa 100 bản ghi
if len(history) > 100:
# Xóa file audio cũ
old_record = history.pop()
try:
if os.path.exists(old_record['audio_path']):
os.remove(old_record['audio_path'])
except:
pass
save_history(history)
return permanent_path
except Exception as e:
print(f"⚠️ Lỗi khi lưu lịch sử: {e}")
return audio_path if audio_path else None
def get_history_list():
"""Lấy danh sách lịch sử để hiển thị"""
history = load_history()
if not history:
return "Chưa có lịch sử tổng hợp nào."
output = []
for i, record in enumerate(history[:50], 1): # Hiển thị 50 bản ghi gần nhất
output.append(f"**{i}. [{record['timestamp']}]** - {record['voice']}")
output.append(f" 📝 {record['text']}")
output.append(f" ⏱️ Thời lượng: {record['duration']:.2f}s | {record['status']}")
output.append("")
return "\n".join(output)
def delete_history_item(item_id):
"""Xóa một item khỏi lịch sử"""
history = load_history()
history = [h for h in history if h['id'] != item_id]
save_history(history)
# --- BACKGROUND PROCESSING QUEUE ---
processing_queue = Queue()
is_processing = False
def background_processor():
"""Xử lý queue tổng hợp trong background"""
global is_processing
while True:
task = processing_queue.get()
if task is None:
break
is_processing = True
text, voice = task
try:
print(f"[Background] Bắt đầu tổng hợp: {text[:50]}...")
result = synthesize_speech_internal(text, voice)
print(f"[Background] Hoàn thành: {result}")
except Exception as e:
print(f"[Background] Lỗi: {e}")
is_processing = False
processing_queue.task_done()
# Khởi động background thread
bg_thread = threading.Thread(target=background_processor, daemon=True)
bg_thread.start()
def split_text_into_chunks(text, max_chars=256):
"""Chia text thành chunks"""
sentences = re.split(r'([.!?,;])', text)
chunks = []
current = ""
for i in range(0, len(sentences), 2):
sentence = sentences[i]
punct = sentences[i+1] if i+1 < len(sentences) else ""
segment = sentence + punct
if len(current) + len(segment) <= max_chars:
current += segment
else:
if current:
chunks.append(current.strip())
current = segment
if current:
chunks.append(current.strip())
return chunks if chunks else [text]
def decode_audio(codes_str, codec):
"""Decode speech tokens to audio"""
speech_ids = [int(num) for num in re.findall(r"<\|speech_(\d+)\|>", codes_str)]
if len(speech_ids) == 0:
print("Không tìm thấy mã giọng nói hợp lệ.")
return np.array([], dtype=np.float32)
codes = np.array(speech_ids, dtype=np.int32)[np.newaxis, np.newaxis, :]
recon = codec.decode_code(codes)
return recon[0, 0, :]
# --- MODEL LOADING ---
print("📦 Đang tải model Q4 GGUF và Codec ONNX...")
model_loaded = False
backbone = None
codec = None
try:
backbone = Llama.from_pretrained(
repo_id=BACKBONE_REPO,
filename="*.gguf",
verbose=False,
n_gpu_layers=-1,
n_ctx=2048,
mlock=True,
flash_attn=True,
)
codec = NeuCodecOnnxDecoder.from_pretrained(CODEC_REPO)
print("✅ Model đã tải thành công!")
model_loaded = True
except Exception as e:
import traceback
traceback.print_exc()
print(f"❌ Lỗi khi tải model: {e}")
model_loaded = False
# --- SYNTHESIS FUNCTION (Internal) ---
def synthesize_speech_internal(text, voice_choice):
"""Internal synthesis function không phụ thuộc UI"""
global backbone, codec, model_loaded
if not model_loaded:
return None
if not text or text.strip() == "":
return None
if voice_choice not in VOICE_SAMPLES:
return None
raw_text = text.strip()
# Load reference text
ref_text_path = VOICE_SAMPLES[voice_choice]["text"]
try:
with open(ref_text_path, "r", encoding="utf-8") as f:
ref_text_raw = f.read()
except Exception as e:
print(f"❌ Lỗi đọc file text mẫu: {e}")
return None
# Tải codes
ref_codes = None
codes_path = VOICE_SAMPLES[voice_choice]["codes"]
try:
ref_codes_tensor = torch.load(codes_path, map_location="cpu")
if isinstance(ref_codes_tensor, torch.Tensor):
ref_codes = ref_codes_tensor.cpu().numpy()
else:
ref_codes = np.array(ref_codes_tensor)
except Exception as e:
print(f"❌ Lỗi khi tải codes: {e}")
return None
if ref_codes is None or len(ref_codes) == 0:
return None
# Split text
text_chunks = split_text_into_chunks(raw_text, max_chars=MAX_CHARS_PER_CHUNK)
all_audio_segments = []
silence_pad = np.zeros(int(SAMPLE_RATE * 0.15), dtype=np.float32)
start_time = time.time()
try:
for i, chunk in enumerate(text_chunks):
# Phonemize
ref_text_phoneme = phonemize_with_dict(ref_text_raw)
input_text_phoneme = phonemize_with_dict(chunk)
# Create prompt
codes_str = "".join([f"<|speech_{idx}|>" for idx in ref_codes])
prompt = (
f"user: Convert the text to speech:<|TEXT_PROMPT_START|>{ref_text_phoneme} {input_text_phoneme}"
f"<|TEXT_PROMPT_END|>\nassistant:<|SPEECH_GENERATION_START|>{codes_str}"
)
# Generate
output = backbone(
prompt,
max_tokens=2048,
temperature=1.0,
top_k=50,
stop=["<|SPEECH_GENERATION_END|>"],
)
output_str = output["choices"][0]["text"]
# Decode - CHỈ DECODE PHẦN OUTPUT (không bao gồm reference codes)
chunk_wav = decode_audio(output_str, codec)
if chunk_wav is not None and len(chunk_wav) > 0:
all_audio_segments.append(chunk_wav)
if i < len(text_chunks) - 1:
all_audio_segments.append(silence_pad)
if not all_audio_segments:
return None
final_wav = np.concatenate(all_audio_segments)
with tempfile.NamedTemporaryFile(delete=False, suffix=".wav") as tmp:
sf.write(tmp.name, final_wav, SAMPLE_RATE)
output_path = tmp.name
process_time = time.time() - start_time
# Lưu vào lịch sử
permanent_path = add_to_history(raw_text, voice_choice, output_path, process_time, "Thành công")
return permanent_path
except Exception as e:
import traceback
traceback.print_exc()
return None
# --- SYNTHESIS FUNCTION (UI) ---
def synthesize_speech(text, voice_choice):
"""Main synthesis function với UI feedback"""
global backbone, codec, model_loaded
if not model_loaded:
yield None, "⚠️ Model chưa tải. Vui lòng kiểm tra lỗi console!"
return
if not text or text.strip() == "":
yield None, "⚠️ Vui lòng nhập văn bản!"
return
if voice_choice not in VOICE_SAMPLES:
yield None, "⚠️ Vui lòng chọn giọng mẫu."
return
raw_text = text.strip()
# Load reference text
ref_text_path = VOICE_SAMPLES[voice_choice]["text"]
try:
with open(ref_text_path, "r", encoding="utf-8") as f:
ref_text_raw = f.read()
except Exception as e:
yield None, f"❌ Lỗi đọc file text mẫu: {e}"
return
yield None, "📄 Đang xử lý Reference..."
# Tải codes
ref_codes = None
codes_path = VOICE_SAMPLES[voice_choice]["codes"]
try:
ref_codes_tensor = torch.load(codes_path, map_location="cpu")
if isinstance(ref_codes_tensor, torch.Tensor):
ref_codes = ref_codes_tensor.cpu().numpy()
else:
ref_codes = np.array(ref_codes_tensor)
except Exception as e:
yield None, f"❌ Lỗi khi tải codes: {e}"
return
if ref_codes is None or len(ref_codes) == 0:
yield None, "❌ Codes tham chiếu không hợp lệ."
return
# Split text
text_chunks = split_text_into_chunks(raw_text, max_chars=MAX_CHARS_PER_CHUNK)
total_chunks = len(text_chunks)
yield None, f"🚀 Bắt đầu tổng hợp ({total_chunks} đoạn)..."
all_audio_segments = []
silence_pad = np.zeros(int(SAMPLE_RATE * 0.15), dtype=np.float32)
start_time = time.time()
try:
for i, chunk in enumerate(text_chunks):
yield None, f"⏳ Đang xử lý đoạn {i+1}/{total_chunks}..."
# Phonemize
ref_text_phoneme = phonemize_with_dict(ref_text_raw)
input_text_phoneme = phonemize_with_dict(chunk)
# Create prompt
codes_str = "".join([f"<|speech_{idx}|>" for idx in ref_codes])
prompt = (
f"user: Convert the text to speech:<|TEXT_PROMPT_START|>{ref_text_phoneme} {input_text_phoneme}"
f"<|TEXT_PROMPT_END|>\nassistant:<|SPEECH_GENERATION_START|>{codes_str}"
)
# Generate
output = backbone(
prompt,
max_tokens=2048,
temperature=1.0,
top_k=50,
stop=["<|SPEECH_GENERATION_END|>"],
)
output_str = output["choices"][0]["text"]
# Decode - CHỈ DECODE PHẦN OUTPUT (không bao gồm reference codes)
chunk_wav = decode_audio(output_str, codec)
if chunk_wav is not None and len(chunk_wav) > 0:
all_audio_segments.append(chunk_wav)
if i < total_chunks - 1:
all_audio_segments.append(silence_pad)
if not all_audio_segments:
yield None, "❌ Không sinh được audio nào."
return
yield None, "💾 Đang ghép file và lưu..."
final_wav = np.concatenate(all_audio_segments)
with tempfile.NamedTemporaryFile(delete=False, suffix=".wav") as tmp:
sf.write(tmp.name, final_wav, SAMPLE_RATE)
output_path = tmp.name
process_time = time.time() - start_time
# Lưu vào lịch sử
permanent_path = add_to_history(raw_text, voice_choice, output_path, process_time, "Thành công")
yield permanent_path, f"✅ Hoàn tất! (Tổng thời gian: {process_time:.2f}s)"
except Exception as e:
import traceback
traceback.print_exc()
add_to_history(raw_text, voice_choice, None, 0, f"Lỗi: {str(e)}")
yield None, f"❌ Lỗi tổng hợp: {str(e)}"
def refresh_history():
"""Làm mới danh sách lịch sử"""
return get_history_list()
def load_history_item(history_index):
"""Tải một item từ lịch sử"""
if not history_index:
return None, "", "", ""
try:
index = int(history_index.split(".")[0]) - 1
history = load_history()
if 0 <= index < len(history):
record = history[index]
return (
record['audio_path'] if os.path.exists(record['audio_path']) else None,
record['full_text'],
record['voice'],
f"📅 {record['timestamp']} | ⏱️ {record['duration']:.2f}s"
)
except:
pass
return None, "", "", ""
# --- UI SETUP ---
theme = gr.themes.Ocean(
primary_hue="indigo",
secondary_hue="cyan",
neutral_hue="slate",
font=[gr.themes.GoogleFont('Inter'), 'ui-sans-serif', 'system-ui'],
).set(
button_primary_background_fill="linear-gradient(90deg, #6366f1 0%, #0ea5e9 100%)",
button_primary_background_fill_hover="linear-gradient(90deg, #4f46e5 0%, #0284c7 100%)",
)
css = """
.container { max-width: 1400px; margin: auto; }
.header-box {
text-align: center;
margin-bottom: 25px;
padding: 25px;
background: linear-gradient(135deg, #0f172a 0%, #1e293b 100%);
border-radius: 12px;
color: white;
}
.header-title {
font-size: 2.5rem;
font-weight: 800;
}
.gradient-text {
background: -webkit-linear-gradient(45deg, #60A5FA, #22D3EE);
-webkit-background-clip: text;
-webkit-text-fill-color: transparent;
}
.status-box {
font-weight: bold;
text-align: center;
border: none;
background: transparent;
}
.model-card-content {
display: flex;
flex-wrap: wrap;
justify-content: center;
align-items: center;
gap: 15px;
font-size: 0.9rem;
color: #cbd5e1;
}
.model-card-item {
display: flex;
align-items: center;
justify-content: center;
gap: 6px;
color: #94a3b8;
}
.model-card-link {
color: #3b82f6;
text-decoration: none;
font-weight: 500;
transition: color 0.2s;
}
.model-card-link:hover {
color: #2563eb;
text-decoration: underline;
}
.history-list {
max-height: 600px;
overflow-y: auto;
padding: 15px;
background: #f8fafc;
border-radius: 8px;
}
"""
EXAMPLES_LIST = [
["Về miền Tây không chỉ để ngắm nhìn sông nước hữu tình, mà còn để cảm nhận tấm chân tình của người dân nơi đây.", "Vĩnh (nam miền Nam)"],
["Hà Nội những ngày vào thu mang một vẻ đẹp trầm mặc và cổ kính đến lạ thường.", "Bình (nam miền Bắc)"],
["Thành phố Hồ Chí Minh luôn chuyển mình không ngừng với nhịp sống hối hả, năng động.", "Dung (nữ miền Nam)"],
]
initial_status = f"✅ Model đã tải thành công! (Chạy trên **{DEVICE_INFO}**). Hỗ trợ xử lý background và lưu lịch sử." if model_loaded else "❌ Lỗi khi tải model."
with gr.Blocks(title="VieNeu-TTS", theme=theme, css=css) as demo:
with gr.Column(elem_classes="container"):
gr.HTML(f"""
<div class="header-box">
<h1 class="header-title">
<span class="header-icon">🦜</span>
<span class="gradient-text">VieNeu-TTS Studio</span>
</h1>
<p>Chế độ: {DEVICE_INFO} | Background Processing ✅ | History ✅</p>
<p style="font-size: 0.85rem; color: #94a3b8;">📁 Lịch sử lưu tại: {HISTORY_DIR}</p>
<div class="model-card-content">
<div class="model-card-item">
<strong>Repository:</strong>
<a href="https://github.com/pnnbao97/VieNeu-TTS" target="_blank" class="model-card-link">GitHub</a>
</div>
<div class="model-card-item">
<strong>Tác giả:</strong>
<span>Phạm Nguyễn Ngọc Bảo</span>
</div>
</div>
</div>
""")
gr.Markdown(initial_status)
# --- TABS ---
with gr.Tabs():
# TAB 1: Tổng hợp
with gr.Tab("🎙️ Tổng hợp"):
with gr.Row(elem_classes="container"):
with gr.Column(scale=3):
text_input = gr.Textbox(
label=f"Văn bản (Chia chunk: {MAX_CHARS_PER_CHUNK} ký tự)",
lines=6,
value="Hà Nội, trái tim của Việt Nam, là một thành phố ngàn năm văn hiến với bề dày lịch sử và văn hóa độc đáo. Bước chân trên những con phố cổ kính quanh Hồ Hoàn Kiếm, du khách như được du hành ngược thời gian.",
)
voice_select = gr.Dropdown(
choices=list(VOICE_SAMPLES.keys()),
value=list(VOICE_SAMPLES.keys())[0],
label="👤 Chọn giọng mẫu",
)
btn_generate = gr.Button("🎵 Bắt đầu tổng hợp", variant="primary", size="lg", interactive=model_loaded)
with gr.Column(scale=2):
audio_output = gr.Audio(
label="Kết quả",
type="filepath",
autoplay=True
)
status_output = gr.Textbox(label="Trạng thái", elem_classes="status-box", value="Chờ nhập văn bản...")
gr.Examples(
examples=EXAMPLES_LIST,
inputs=[text_input, voice_select],
outputs=[audio_output, status_output],
fn=synthesize_speech,
cache_examples=False,
label="Các ví dụ nhanh"
)
# TAB 2: Lịch sử
with gr.Tab("📜 Lịch sử"):
with gr.Row():
with gr.Column(scale=1):
btn_refresh = gr.Button("🔄 Làm mới", size="sm")
history_display = gr.Markdown(
value=get_history_list(),
elem_classes="history-list"
)
with gr.Column(scale=1):
gr.Markdown("### Chi tiết")
history_audio = gr.Audio(label="Audio", type="filepath")
history_text = gr.Textbox(label="Văn bản đầy đủ", lines=5)
history_voice = gr.Textbox(label="Giọng đã dùng")
history_info = gr.Textbox(label="Thông tin")
history_select = gr.Textbox(
label="Nhập số thứ tự để xem (vd: 1)",
placeholder="Nhập số..."
)
btn_load_history = gr.Button("📂 Tải item", variant="secondary")
btn_refresh.click(
fn=refresh_history,
outputs=history_display
)
btn_load_history.click(
fn=load_history_item,
inputs=history_select,
outputs=[history_audio, history_text, history_voice, history_info]
)
# Event handlers
btn_generate.click(
fn=synthesize_speech,
inputs=[text_input, voice_select],
outputs=[audio_output, status_output]
)
if __name__ == "__main__":
demo.queue().launch(
server_name="0.0.0.0",
server_port=7860,
)