| import os |
| import gradio as gr |
| import asyncio |
| import base64 |
| from functools import lru_cache |
| import re |
| import tempfile |
| import edge_tts |
| import pysubs2 |
| import requests |
| import emoji |
| from pathlib import Path |
| import json |
|
|
|
|
| VOICES = ['en-US', 'ja-JP', 'ko-KR', 'zh-CN', 'vi-VN'] |
|
|
| def config_subs(): |
| subs = pysubs2.SSAFile() |
| s = subs.styles["Default"] |
| s.fontname = "Nanum Barun Gothic" |
| s.fontsize = 44 |
| s.outline = 2 |
| s.shadow = 1 |
| s.bold = True |
| s.alignment = 2 |
| s.marginv = 58 |
| subs.info["WrapStyle"] = 1 |
| return subs |
|
|
| @lru_cache(maxsize=1) |
| def get_voices(): |
| async def list_voices(): |
| voices = await edge_tts.list_voices() |
| voices = [v for v in voices if any(v['ShortName'].startswith(lang) for lang in VOICES)] |
| return {f"{v['ShortName']} - {v['Gender'][0]}": v['ShortName'] for v in voices} |
| return asyncio.run(list_voices()) |
|
|
|
|
| try: |
| r = requests.get("http://ip-api.com/json/", timeout=5) |
| data = r.json() |
| ip_info = f"{data.get('query','')} | {data.get('country','')} | {data.get('city','')}" |
| except Exception as e: |
| ip_info = "unknown" |
|
|
|
|
| def text_gdoc(text: str) -> str: |
| if not re.search(r"docs\.google\.com", text): |
| return text |
| id = re.search(r"/d/([a-zA-Z0-9_-]+)", text).group(1) |
| r = requests.get(f"https://docs.google.com/document/d/{id}/export?format=txt") |
| r.raise_for_status() |
| return r.text |
|
|
| def decode_base64(text: str): |
| if not text: |
| return text |
| |
| try: |
| return base64.b64decode(text, validate=True).decode("utf-8") |
| except: |
| return text |
|
|
|
|
| def render_audio(text, voice, rate, pitch): |
| sub_data = [] |
| voice_short = voice.split(" - ")[0] |
| chunks = split_chunks(text) |
|
|
| async def run(): |
| audio_buf = bytearray() |
| for chunk in chunks: |
| audio_chunk, subs_chunk = await tts_chunk(chunk, voice_short, rate, pitch) |
| audio_buf.extend(audio_chunk) |
| sub_data.extend(subs_chunk) |
| return bytes(audio_buf), sub_data |
|
|
| return asyncio.run(run()) |
|
|
| async def tts_comunicate(chunk, voice, rate, pitch): |
| comm = edge_tts.Communicate( |
| chunk, voice, rate=f"{rate:+d}%", pitch=f"{pitch:+d}Hz", boundary="WordBoundary" |
| ) |
| audio_buf = bytearray() |
| words_data = [] |
|
|
| async for c in comm.stream(): |
| if c["type"] == "audio": |
| audio_buf.extend(c["data"]) |
| elif c["type"] == "WordBoundary": |
| start = int(c["offset"] / 10_000) |
| end = start + int(c["duration"] / 10_000) |
| words_data.append((start, end, c["text"])) |
| return audio_buf, words_data |
|
|
| async def tts_chunk(chunk, voice, rate, pitch): |
| audio_buf, words_data = await tts_comunicate(chunk, voice, rate, pitch) |
| |
| words_only = [w[2] for w in words_data] |
| grouped_lines = group_words(words_only) |
|
|
| idx = 0 |
| subs = [] |
| for line in grouped_lines: |
| word_count = len(line.split()) |
| if idx + word_count - 1 >= len(words_data): |
| last_word = words_data[-1] |
| subs.append({ |
| "start": last_word[0], |
| "end": last_word[1], |
| "text": line, |
| "words": words_data[idx:] |
| }) |
| break |
| start = words_data[idx][0] |
| end = words_data[idx + word_count - 1][1] |
| subs.append({"start": start, "end": end, "text": line, "words": words_data[idx:idx + word_count]}) |
| idx += word_count |
|
|
| return bytes(audio_buf), subs |
|
|
| def split_chunks(text, max_bytes=2*1024*1024): |
| if text.startswith('\ufeff'): |
| text = text[1:] |
| chunks, cur = [], "" |
| paragraphs = text.split("\n\n") |
| for para in paragraphs: |
| test_chunk = cur + "\n\n" + para if cur else para |
| if len(test_chunk.encode('utf-8')) > max_bytes: |
| if cur: |
| chunks.append(cur.strip()) |
| cur = para |
| else: |
| cur = test_chunk |
| if cur: |
| chunks.append(cur.strip()) |
| return chunks |
|
|
| def group_words(words, max_length=15): |
| groups, current_group, current_length = [], [], 0 |
| for word in words: |
| if current_length + len(word) + 1 <= max_length: |
| current_group.append(word) |
| current_length += len(word) + 1 |
| else: |
| if current_group: |
| groups.append(" ".join(current_group)) |
| current_group, current_length = [word], len(word) + 1 |
| if current_group: |
| groups.append(" ".join(current_group)) |
| return groups |
|
|
| def create_word_highlight(words, current_start, current_end): |
| parts = [] |
| for start, end, word in words: |
| if start == current_start and end == current_end: |
| parts.append(f"{{\\c&H00FFFF&}}{word}{{\\c&HFFFFFF&}}") |
| else: |
| parts.append(word) |
| return " ".join(parts) |
|
|
|
|
| def add_karaoke_subtitles(subs, words): |
| if not words: |
| return |
| for i, (start, end, word) in enumerate(words): |
| event_start = start |
| event_end = words[i + 1][0] if i < len(words) - 1 else end |
| highlighted = create_word_highlight(words, start, end) |
| subs.append(pysubs2.SSAEvent(start=event_start, end=event_end, text=highlighted)) |
|
|
|
|
| def base64_encode(s): |
| return base64.b64encode(s.encode("utf-8")).decode("utf-8") |
|
|
| def speech(text, voice, rate, pitch, webhook=None): |
| text = text.strip() |
|
|
| if text: |
| text = emoji.replace_emoji(text, replace='') |
|
|
| |
| if text.startswith("https://"): |
| text = text_gdoc(text) |
| else: |
| text = decode_base64(text) |
| |
| if not text: |
| raise ValueError("Please enter text") |
| if not voice: |
| raise ValueError("Please select a voice") |
| |
| print('text: ', text[:10], '...') |
| |
| subs = config_subs() |
| audio_bytes, sub_data = render_audio(text, voice, rate, pitch) |
|
|
| for sub in sub_data: |
| if "words" in sub: |
| add_karaoke_subtitles(subs, sub["words"]) |
| else: |
| subs.append(pysubs2.SSAEvent(start=sub["start"], end=sub["end"], text=sub["text"])) |
|
|
| srt_content = subs.to_string("ass") |
|
|
| with tempfile.NamedTemporaryFile(delete=False, suffix=".mp3") as audio_file: |
| audio_file.write(audio_bytes) |
| with tempfile.NamedTemporaryFile(delete=False, suffix=".txt", mode="w", encoding="utf-8") as srt_file: |
| srt_file.write(srt_content) |
|
|
| if webhook: |
| print('webhook', webhook) |
|
|
| def uppath(key) -> str: |
| r = requests.get('https://g86.xyz/api/upload', params={"k": key}) |
| r.raise_for_status() |
| return r.text |
| |
| def basename(s): |
| return s.split("/")[-1] |
|
|
| names = [audio_file.name, srt_file.name] |
| for nm in names: |
| with open(nm, "rb") as f: |
| u = uppath(basename(nm)) |
| print('upload to:', u) |
| mime = "audio/mpeg" if nm.endswith(".mp3") else "text/plain" |
| r = requests.put(u, data=f, headers={"Content-Type": mime}) |
| r.raise_for_status() |
|
|
| |
| dm = 'http://s3.g86.xyz' |
| payload = { |
| "data" : { |
| "audio": f'{dm}/{basename(audio_file.name)}', |
| "subtitle": f'{dm}/{basename(srt_file.name)}' |
| }, |
| "status": "success" |
| } |
| |
| print('output: ', payload) |
| |
| requests.post(webhook, json=payload, timeout=10) |
|
|
| return audio_file.name, srt_file.name |
|
|
| |
| |
| |
| voices = get_voices() |
| with gr.Blocks() as demo: |
| with gr.Row(): |
| with gr.Column(scale=3): |
| gr.Markdown("# Text to Speech") |
| gr.Markdown(ip_info) |
| with gr.Column(scale=1): |
| submit_btn = gr.Button("RUN", variant="primary") |
|
|
| input_text = gr.Textbox(label="Input", lines=5, info="You can enter text or a Google Docs link", placeholder="Enter text or Google Docs link here...") |
| voice_dropdown = gr.Dropdown(choices=list(voices.keys()), label="Voice", value=list(voices.keys())[0]) |
| |
| with gr.Accordion("Advanced Options", open=False): |
| rate_in = gr.Slider(-100, 100, value=0, label="Speech Rate (%)", step=10) |
| pitch_in = gr.Slider(-100, 100, value=0, label="Pitch (Hz)", step=10) |
| webhook_in = gr.Textbox(label="Webhook URL (optional)", placeholder="Enter webhook URL if you want the output sent there...") |
|
|
| submit_btn.click( |
| fn=speech, |
| inputs=[input_text, voice_dropdown, rate_in, pitch_in, webhook_in], |
| outputs=[ |
| gr.Audio(label="Generated Audio", autoplay=False, show_download_button=True, type="filepath"), |
| gr.File(label="ASS Subtitle", interactive=False), |
| ] |
| ) |
|
|
| if __name__ == "__main__": |
| demo.launch(show_api=False, show_error=True) |
|
|