tool / pkg /tts_run.py
Vo Hoang Minh
up
ecbf30b
import os
import gradio as gr
import asyncio
import base64
from functools import lru_cache
import re
import tempfile
import edge_tts
import pysubs2
import requests
import emoji
from pathlib import Path
import json
VOICES = ['en-US', 'ja-JP', 'ko-KR', 'zh-CN', 'vi-VN']
def config_subs():
subs = pysubs2.SSAFile()
s = subs.styles["Default"]
s.fontname = "Nanum Barun Gothic"
s.fontsize = 44
s.outline = 2
s.shadow = 1
s.bold = True
s.alignment = 2
s.marginv = 58
subs.info["WrapStyle"] = 1
return subs
@lru_cache(maxsize=1)
def get_voices():
async def list_voices():
voices = await edge_tts.list_voices()
voices = [v for v in voices if any(v['ShortName'].startswith(lang) for lang in VOICES)]
return {f"{v['ShortName']} - {v['Gender'][0]}": v['ShortName'] for v in voices}
return asyncio.run(list_voices())
try:
r = requests.get("http://ip-api.com/json/", timeout=5)
data = r.json()
ip_info = f"{data.get('query','')} | {data.get('country','')} | {data.get('city','')}"
except Exception as e:
ip_info = "unknown"
def text_gdoc(text: str) -> str:
if not re.search(r"docs\.google\.com", text):
return text
id = re.search(r"/d/([a-zA-Z0-9_-]+)", text).group(1)
r = requests.get(f"https://docs.google.com/document/d/{id}/export?format=txt")
r.raise_for_status()
return r.text
def decode_base64(text: str):
if not text:
return text
try:
return base64.b64decode(text, validate=True).decode("utf-8")
except:
return text
def render_audio(text, voice, rate, pitch):
sub_data = []
voice_short = voice.split(" - ")[0]
chunks = split_chunks(text)
async def run():
audio_buf = bytearray()
for chunk in chunks:
audio_chunk, subs_chunk = await tts_chunk(chunk, voice_short, rate, pitch)
audio_buf.extend(audio_chunk)
sub_data.extend(subs_chunk)
return bytes(audio_buf), sub_data
return asyncio.run(run())
async def tts_comunicate(chunk, voice, rate, pitch):
comm = edge_tts.Communicate(
chunk, voice, rate=f"{rate:+d}%", pitch=f"{pitch:+d}Hz", boundary="WordBoundary"
)
audio_buf = bytearray()
words_data = []
async for c in comm.stream():
if c["type"] == "audio":
audio_buf.extend(c["data"])
elif c["type"] == "WordBoundary":
start = int(c["offset"] / 10_000)
end = start + int(c["duration"] / 10_000)
words_data.append((start, end, c["text"]))
return audio_buf, words_data
async def tts_chunk(chunk, voice, rate, pitch):
audio_buf, words_data = await tts_comunicate(chunk, voice, rate, pitch)
words_only = [w[2] for w in words_data]
grouped_lines = group_words(words_only)
idx = 0
subs = []
for line in grouped_lines:
word_count = len(line.split())
if idx + word_count - 1 >= len(words_data):
last_word = words_data[-1]
subs.append({
"start": last_word[0],
"end": last_word[1],
"text": line,
"words": words_data[idx:]
})
break
start = words_data[idx][0]
end = words_data[idx + word_count - 1][1]
subs.append({"start": start, "end": end, "text": line, "words": words_data[idx:idx + word_count]})
idx += word_count
return bytes(audio_buf), subs
def split_chunks(text, max_bytes=2*1024*1024):
if text.startswith('\ufeff'):
text = text[1:]
chunks, cur = [], ""
paragraphs = text.split("\n\n")
for para in paragraphs:
test_chunk = cur + "\n\n" + para if cur else para
if len(test_chunk.encode('utf-8')) > max_bytes:
if cur:
chunks.append(cur.strip())
cur = para
else:
cur = test_chunk
if cur:
chunks.append(cur.strip())
return chunks
def group_words(words, max_length=15):
groups, current_group, current_length = [], [], 0
for word in words:
if current_length + len(word) + 1 <= max_length:
current_group.append(word)
current_length += len(word) + 1
else:
if current_group:
groups.append(" ".join(current_group))
current_group, current_length = [word], len(word) + 1
if current_group:
groups.append(" ".join(current_group))
return groups
def create_word_highlight(words, current_start, current_end):
parts = []
for start, end, word in words:
if start == current_start and end == current_end:
parts.append(f"{{\\c&H00FFFF&}}{word}{{\\c&HFFFFFF&}}")
else:
parts.append(word)
return " ".join(parts)
def add_karaoke_subtitles(subs, words):
if not words:
return
for i, (start, end, word) in enumerate(words):
event_start = start
event_end = words[i + 1][0] if i < len(words) - 1 else end
highlighted = create_word_highlight(words, start, end)
subs.append(pysubs2.SSAEvent(start=event_start, end=event_end, text=highlighted))
def base64_encode(s):
return base64.b64encode(s.encode("utf-8")).decode("utf-8")
def speech(text, voice, rate, pitch, webhook=None):
text = text.strip()
if text:
text = emoji.replace_emoji(text, replace='')
# is https
if text.startswith("https://"):
text = text_gdoc(text)
else:
text = decode_base64(text)
if not text:
raise ValueError("Please enter text")
if not voice:
raise ValueError("Please select a voice")
print('text: ', text[:10], '...')
subs = config_subs()
audio_bytes, sub_data = render_audio(text, voice, rate, pitch)
for sub in sub_data:
if "words" in sub:
add_karaoke_subtitles(subs, sub["words"])
else:
subs.append(pysubs2.SSAEvent(start=sub["start"], end=sub["end"], text=sub["text"]))
srt_content = subs.to_string("ass")
with tempfile.NamedTemporaryFile(delete=False, suffix=".mp3") as audio_file:
audio_file.write(audio_bytes)
with tempfile.NamedTemporaryFile(delete=False, suffix=".txt", mode="w", encoding="utf-8") as srt_file:
srt_file.write(srt_content)
if webhook:
print('webhook', webhook)
def uppath(key) -> str:
r = requests.get('https://g86.xyz/api/upload', params={"k": key})
r.raise_for_status()
return r.text
def basename(s):
return s.split("/")[-1]
names = [audio_file.name, srt_file.name]
for nm in names:
with open(nm, "rb") as f:
u = uppath(basename(nm))
print('upload to:', u)
mime = "audio/mpeg" if nm.endswith(".mp3") else "text/plain"
r = requests.put(u, data=f, headers={"Content-Type": mime})
r.raise_for_status()
dm = 'http://s3.g86.xyz'
payload = {
"data" : {
"audio": f'{dm}/{basename(audio_file.name)}',
"subtitle": f'{dm}/{basename(srt_file.name)}'
},
"status": "success"
}
print('output: ', payload)
requests.post(webhook, json=payload, timeout=10)
return audio_file.name, srt_file.name
# --------------------------
# Gradio UI
# --------------------------
voices = get_voices()
with gr.Blocks() as demo:
with gr.Row():
with gr.Column(scale=3):
gr.Markdown("# Text to Speech")
gr.Markdown(ip_info) # hiển thị IP gọn ngay khi load
with gr.Column(scale=1):
submit_btn = gr.Button("RUN", variant="primary")
input_text = gr.Textbox(label="Input", lines=5, info="You can enter text or a Google Docs link", placeholder="Enter text or Google Docs link here...")
voice_dropdown = gr.Dropdown(choices=list(voices.keys()), label="Voice", value=list(voices.keys())[0])
with gr.Accordion("Advanced Options", open=False):
rate_in = gr.Slider(-100, 100, value=0, label="Speech Rate (%)", step=10)
pitch_in = gr.Slider(-100, 100, value=0, label="Pitch (Hz)", step=10)
webhook_in = gr.Textbox(label="Webhook URL (optional)", placeholder="Enter webhook URL if you want the output sent there...")
submit_btn.click(
fn=speech,
inputs=[input_text, voice_dropdown, rate_in, pitch_in, webhook_in],
outputs=[
gr.Audio(label="Generated Audio", autoplay=False, show_download_button=True, type="filepath"),
gr.File(label="ASS Subtitle", interactive=False),
]
)
if __name__ == "__main__":
demo.launch(show_api=False, show_error=True)