import os import gradio as gr import asyncio import base64 from functools import lru_cache import re import tempfile import edge_tts import pysubs2 import requests import emoji from pathlib import Path import json VOICES = ['en-US', 'ja-JP', 'ko-KR', 'zh-CN', 'vi-VN'] def config_subs(): subs = pysubs2.SSAFile() s = subs.styles["Default"] s.fontname = "Nanum Barun Gothic" s.fontsize = 44 s.outline = 2 s.shadow = 1 s.bold = True s.alignment = 2 s.marginv = 58 subs.info["WrapStyle"] = 1 return subs @lru_cache(maxsize=1) def get_voices(): async def list_voices(): voices = await edge_tts.list_voices() voices = [v for v in voices if any(v['ShortName'].startswith(lang) for lang in VOICES)] return {f"{v['ShortName']} - {v['Gender'][0]}": v['ShortName'] for v in voices} return asyncio.run(list_voices()) try: r = requests.get("http://ip-api.com/json/", timeout=5) data = r.json() ip_info = f"{data.get('query','')} | {data.get('country','')} | {data.get('city','')}" except Exception as e: ip_info = "unknown" def text_gdoc(text: str) -> str: if not re.search(r"docs\.google\.com", text): return text id = re.search(r"/d/([a-zA-Z0-9_-]+)", text).group(1) r = requests.get(f"https://docs.google.com/document/d/{id}/export?format=txt") r.raise_for_status() return r.text def decode_base64(text: str): if not text: return text try: return base64.b64decode(text, validate=True).decode("utf-8") except: return text def render_audio(text, voice, rate, pitch): sub_data = [] voice_short = voice.split(" - ")[0] chunks = split_chunks(text) async def run(): audio_buf = bytearray() for chunk in chunks: audio_chunk, subs_chunk = await tts_chunk(chunk, voice_short, rate, pitch) audio_buf.extend(audio_chunk) sub_data.extend(subs_chunk) return bytes(audio_buf), sub_data return asyncio.run(run()) async def tts_comunicate(chunk, voice, rate, pitch): comm = edge_tts.Communicate( chunk, voice, rate=f"{rate:+d}%", pitch=f"{pitch:+d}Hz", boundary="WordBoundary" ) audio_buf = bytearray() words_data = [] async for c in comm.stream(): if c["type"] == "audio": audio_buf.extend(c["data"]) elif c["type"] == "WordBoundary": start = int(c["offset"] / 10_000) end = start + int(c["duration"] / 10_000) words_data.append((start, end, c["text"])) return audio_buf, words_data async def tts_chunk(chunk, voice, rate, pitch): audio_buf, words_data = await tts_comunicate(chunk, voice, rate, pitch) words_only = [w[2] for w in words_data] grouped_lines = group_words(words_only) idx = 0 subs = [] for line in grouped_lines: word_count = len(line.split()) if idx + word_count - 1 >= len(words_data): last_word = words_data[-1] subs.append({ "start": last_word[0], "end": last_word[1], "text": line, "words": words_data[idx:] }) break start = words_data[idx][0] end = words_data[idx + word_count - 1][1] subs.append({"start": start, "end": end, "text": line, "words": words_data[idx:idx + word_count]}) idx += word_count return bytes(audio_buf), subs def split_chunks(text, max_bytes=2*1024*1024): if text.startswith('\ufeff'): text = text[1:] chunks, cur = [], "" paragraphs = text.split("\n\n") for para in paragraphs: test_chunk = cur + "\n\n" + para if cur else para if len(test_chunk.encode('utf-8')) > max_bytes: if cur: chunks.append(cur.strip()) cur = para else: cur = test_chunk if cur: chunks.append(cur.strip()) return chunks def group_words(words, max_length=15): groups, current_group, current_length = [], [], 0 for word in words: if current_length + len(word) + 1 <= max_length: current_group.append(word) current_length += len(word) + 1 else: if current_group: groups.append(" ".join(current_group)) current_group, current_length = [word], len(word) + 1 if current_group: groups.append(" ".join(current_group)) return groups def create_word_highlight(words, current_start, current_end): parts = [] for start, end, word in words: if start == current_start and end == current_end: parts.append(f"{{\\c&H00FFFF&}}{word}{{\\c&HFFFFFF&}}") else: parts.append(word) return " ".join(parts) def add_karaoke_subtitles(subs, words): if not words: return for i, (start, end, word) in enumerate(words): event_start = start event_end = words[i + 1][0] if i < len(words) - 1 else end highlighted = create_word_highlight(words, start, end) subs.append(pysubs2.SSAEvent(start=event_start, end=event_end, text=highlighted)) def base64_encode(s): return base64.b64encode(s.encode("utf-8")).decode("utf-8") def speech(text, voice, rate, pitch, webhook=None): text = text.strip() if text: text = emoji.replace_emoji(text, replace='') # is https if text.startswith("https://"): text = text_gdoc(text) else: text = decode_base64(text) if not text: raise ValueError("Please enter text") if not voice: raise ValueError("Please select a voice") print('text: ', text[:10], '...') subs = config_subs() audio_bytes, sub_data = render_audio(text, voice, rate, pitch) for sub in sub_data: if "words" in sub: add_karaoke_subtitles(subs, sub["words"]) else: subs.append(pysubs2.SSAEvent(start=sub["start"], end=sub["end"], text=sub["text"])) srt_content = subs.to_string("ass") with tempfile.NamedTemporaryFile(delete=False, suffix=".mp3") as audio_file: audio_file.write(audio_bytes) with tempfile.NamedTemporaryFile(delete=False, suffix=".txt", mode="w", encoding="utf-8") as srt_file: srt_file.write(srt_content) if webhook: print('webhook', webhook) def uppath(key) -> str: r = requests.get('https://g86.xyz/api/upload', params={"k": key}) r.raise_for_status() return r.text def basename(s): return s.split("/")[-1] names = [audio_file.name, srt_file.name] for nm in names: with open(nm, "rb") as f: u = uppath(basename(nm)) print('upload to:', u) mime = "audio/mpeg" if nm.endswith(".mp3") else "text/plain" r = requests.put(u, data=f, headers={"Content-Type": mime}) r.raise_for_status() dm = 'http://s3.g86.xyz' payload = { "data" : { "audio": f'{dm}/{basename(audio_file.name)}', "subtitle": f'{dm}/{basename(srt_file.name)}' }, "status": "success" } print('output: ', payload) requests.post(webhook, json=payload, timeout=10) return audio_file.name, srt_file.name # -------------------------- # Gradio UI # -------------------------- voices = get_voices() with gr.Blocks() as demo: with gr.Row(): with gr.Column(scale=3): gr.Markdown("# Text to Speech") gr.Markdown(ip_info) # hiển thị IP gọn ngay khi load with gr.Column(scale=1): submit_btn = gr.Button("RUN", variant="primary") input_text = gr.Textbox(label="Input", lines=5, info="You can enter text or a Google Docs link", placeholder="Enter text or Google Docs link here...") voice_dropdown = gr.Dropdown(choices=list(voices.keys()), label="Voice", value=list(voices.keys())[0]) with gr.Accordion("Advanced Options", open=False): rate_in = gr.Slider(-100, 100, value=0, label="Speech Rate (%)", step=10) pitch_in = gr.Slider(-100, 100, value=0, label="Pitch (Hz)", step=10) webhook_in = gr.Textbox(label="Webhook URL (optional)", placeholder="Enter webhook URL if you want the output sent there...") submit_btn.click( fn=speech, inputs=[input_text, voice_dropdown, rate_in, pitch_in, webhook_in], outputs=[ gr.Audio(label="Generated Audio", autoplay=False, show_download_button=True, type="filepath"), gr.File(label="ASS Subtitle", interactive=False), ] ) if __name__ == "__main__": demo.launch(show_api=False, show_error=True)