cs2764's picture
m4b bug fix
de4756c verified
import gradio as gr
import edge_tts
import asyncio
import tempfile
import os
import re
import shutil
from pydub import AudioSegment
import math
import time
from datetime import datetime, timedelta
import logging
from text_cleaning import TextCleaner
# EPUB parsing
try:
import ebooklib
from ebooklib import epub
from bs4 import BeautifulSoup
EPUB_SUPPORT = True
except ImportError:
EPUB_SUPPORT = False
logging.warning("ebooklib or beautifulsoup4 not installed. EPUB support disabled.")
# Encoding detection
try:
import chardet
CHARDET_SUPPORT = True
except ImportError:
CHARDET_SUPPORT = False
logging.warning("chardet not installed. Encoding detection will use fallback method.")
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.StreamHandler()
]
)
logger = logging.getLogger(__name__)
def detect_file_encoding(file_path):
"""Detect file encoding using chardet or fallback method"""
if CHARDET_SUPPORT:
with open(file_path, 'rb') as f:
raw_data = f.read()
result = chardet.detect(raw_data)
encoding = result['encoding']
confidence = result['confidence']
logger.info(f"Detected encoding: {encoding} (confidence: {confidence:.2%})")
# Handle common encoding aliases
if encoding:
encoding_lower = encoding.lower()
# Map common aliases to standard names
encoding_map = {
'gb2312': 'gbk', # GBK is superset of GB2312
'gb18030': 'gb18030',
'ascii': 'utf-8', # ASCII is subset of UTF-8
'iso-8859-1': 'latin-1',
'windows-1252': 'cp1252',
}
encoding = encoding_map.get(encoding_lower, encoding)
return encoding
else:
# Fallback: try common encodings
return None
def read_text_file_with_encoding(file_path):
"""Read text file with automatic encoding detection"""
# First try chardet detection
detected_encoding = detect_file_encoding(file_path)
# Priority list of encodings to try
# Common encodings for Chinese: UTF-8, GBK, GB2312, GB18030
# Common encodings for English/Western: UTF-8, Latin-1, CP1252
encodings_to_try = []
if detected_encoding:
encodings_to_try.append(detected_encoding)
# Add common encodings as fallback
encodings_to_try.extend([
'utf-8',
'utf-8-sig', # UTF-8 with BOM
'gbk', # Chinese (simplified)
'gb18030', # Chinese (extended)
'big5', # Chinese (traditional)
'utf-16',
'latin-1', # Western European
'cp1252', # Windows Western
'shift_jis', # Japanese
'euc-kr', # Korean
])
# Remove duplicates while preserving order
seen = set()
unique_encodings = []
for enc in encodings_to_try:
if enc and enc.lower() not in seen:
seen.add(enc.lower())
unique_encodings.append(enc)
last_error = None
for encoding in unique_encodings:
try:
with open(file_path, 'r', encoding=encoding) as f:
text = f.read()
# Validate: check if text contains too many replacement characters
if text.count('\ufffd') > len(text) * 0.1: # More than 10% replacement chars
logger.debug(f"Encoding {encoding} produced too many replacement characters, trying next...")
continue
logger.info(f"Successfully read file with encoding: {encoding}")
return text
except (UnicodeDecodeError, LookupError) as e:
last_error = e
logger.debug(f"Failed to decode with {encoding}: {e}")
continue
logger.error(f"Failed to decode file with any encoding. Last error: {last_error}")
return None
def parse_uploaded_file(file_path):
"""Parse uploaded txt or epub file and return text content and filename"""
if file_path is None:
return None, None
filename = os.path.splitext(os.path.basename(file_path))[0]
ext = os.path.splitext(file_path)[1].lower()
if ext == '.txt':
text = read_text_file_with_encoding(file_path)
if text:
logger.info(f"Parsed TXT file: {filename}, {len(text)} chars")
return text, filename
else:
logger.error(f"Failed to decode TXT file: {filename}")
return None, filename
elif ext == '.epub':
if not EPUB_SUPPORT:
logger.error("EPUB support not available")
return None, filename
try:
book = epub.read_epub(file_path)
text_parts = []
for item in book.get_items():
if item.get_type() == ebooklib.ITEM_DOCUMENT:
soup = BeautifulSoup(item.get_content(), 'html.parser')
text_parts.append(soup.get_text(separator='\n'))
text = '\n\n'.join(text_parts)
logger.info(f"Parsed EPUB file: {filename}, {len(text)} chars")
return text, filename
except Exception as e:
logger.error(f"Failed to parse EPUB: {e}")
return None, filename
return None, None
async def convert_to_m4b(mp3_path, output_filename):
"""Convert MP3 to M4B format using ffmpeg directly (supports large files)"""
try:
import subprocess
m4b_path = tempfile.NamedTemporaryFile(delete=False, suffix=".m4b").name
# Use ffmpeg directly for conversion (avoids pydub's 4GB limit)
cmd = [
'ffmpeg', '-y', # Overwrite output
'-i', mp3_path, # Input file
'-c:a', 'aac', # Audio codec
'-b:a', '128k', # Audio bitrate
'-f', 'ipod', # M4B/M4A format
m4b_path
]
logger.info(f"Running ffmpeg conversion: {' '.join(cmd)}")
result = subprocess.run(
cmd,
capture_output=True,
text=True,
timeout=3600 # 1 hour timeout for large files
)
if result.returncode != 0:
logger.error(f"ffmpeg error: {result.stderr}")
if os.path.exists(m4b_path):
os.remove(m4b_path)
return None
logger.info(f"Converted to M4B: {m4b_path}")
return m4b_path
except FileNotFoundError:
logger.error("ffmpeg not found. Please install ffmpeg to use M4B format.")
return None
except subprocess.TimeoutExpired:
logger.error("ffmpeg conversion timed out")
return None
except Exception as e:
logger.error(f"Failed to convert to M4B: {e}")
return None
async def get_voices():
voices = await edge_tts.list_voices()
return {f"{v['ShortName']} - {v['Locale']} ({v['Gender']})": v['ShortName'] for v in voices}
def format_time_remaining(seconds):
"""Format seconds into human readable time remaining"""
if seconds < 60:
return f"{int(seconds)}s"
elif seconds < 3600:
minutes = seconds / 60
return f"{minutes:.1f}m"
else:
hours = seconds / 3600
return f"{hours:.1f}h"
def calculate_eta(start_time, completed_items, total_items):
"""Calculate estimated time remaining"""
if completed_items == 0:
return "Calculating..."
elapsed_time = time.time() - start_time
time_per_item = elapsed_time / completed_items
remaining_items = total_items - completed_items
remaining_time = time_per_item * remaining_items
return format_time_remaining(remaining_time)
def estimate_text_duration(text):
"""Estimate speech duration in minutes based on text length"""
# Simple heuristic:
# For English (space-separated), ~150 words/min
# For Chinese (no spaces), ~300 chars/min
# We'll use a hybrid approach: count spaces to guess if it's space-separated.
if not text:
return 0
space_count = text.count(' ')
total_len = len(text)
# If spaces are < 10% of length, assume non-space-separated (like Chinese)
if space_count / total_len < 0.1:
# Approx 300 chars per minute for Chinese
duration = total_len / 300
# logger.debug(f"Estimated duration (char-based): {duration:.2f} min ({total_len} chars)")
else:
# Approx 150 words per minute for English
word_count = len(text.split())
duration = word_count / 150
# logger.debug(f"Estimated duration (word-based): {duration:.2f} min ({word_count} words)")
return duration
def split_text_by_paragraphs(text, max_duration_minutes=5, max_chars=500):
"""Split text into segments that won't exceed limit with safety margin"""
max_duration = max_duration_minutes
estimated_duration = estimate_text_duration(text)
logger.info(f"Checking segmentation: Duration={estimated_duration:.2f}m, Chars={len(text)}, Limit={max_duration}m/{max_chars}chars")
if estimated_duration <= max_duration and len(text) <= max_chars:
return [text]
logger.info(f"Text exceeds limits. Splitting...")
# Split by paragraphs first
paragraphs = text.split('\n\n')
segments = []
current_segment = ""
for paragraph in paragraphs:
paragraph_duration = estimate_text_duration(paragraph)
# If single paragraph is too long, split by sentences
# Improved regex to include Chinese punctuation
if paragraph_duration > max_duration or len(paragraph) > max_chars:
sentences = re.split(r'([.!?。!?]+)', paragraph)
# Re-attach delimiters to sentences
real_sentences = []
for i in range(0, len(sentences) - 1, 2):
real_sentences.append(sentences[i] + sentences[i+1])
if len(sentences) % 2 == 1 and sentences[-1]:
real_sentences.append(sentences[-1])
for sentence in real_sentences:
sentence = sentence.strip()
if not sentence:
continue
# Check both duration and char count
if (estimate_text_duration(current_segment + sentence) > max_duration or
len(current_segment + sentence) > max_chars) and current_segment:
segments.append(current_segment.strip())
current_segment = sentence
else:
current_segment += sentence
else:
if (estimate_text_duration(current_segment + paragraph) > max_duration or
len(current_segment + paragraph) > max_chars) and current_segment:
segments.append(current_segment.strip())
current_segment = paragraph + "\n\n"
else:
current_segment += paragraph + "\n\n"
if current_segment.strip():
segments.append(current_segment.strip())
logger.info(f"Split text into {len(segments)} segments.")
return segments
import io
async def generate_audio_segment(text_segment, voice_short_name, rate_str, volume_str, pitch_str, segment_index):
"""Generate audio for a single text segment and save to temporary file"""
logger.info(f"Generating segment {segment_index}...")
communicate = edge_tts.Communicate(text_segment, voice_short_name, rate=rate_str, volume=volume_str, pitch=pitch_str)
# Save directly to temporary file instead of memory
tmp_file = tempfile.NamedTemporaryFile(delete=False, suffix=f"_seg{segment_index}.mp3")
tmp_path = tmp_file.name
tmp_file.close()
try:
await communicate.save(tmp_path)
except Exception as e:
logger.error(f"Error generating segment {segment_index} (Length: {len(text_segment)} chars): {e}")
if os.path.exists(tmp_path):
os.remove(tmp_path)
raise gr.Error(f"Error generating segment {segment_index}: {e}")
# Verify segment duration
try:
seg_audio = AudioSegment.from_mp3(tmp_path)
duration_min = len(seg_audio) / 1000 / 60
logger.info(f"Segment {segment_index} saved to temp file (Duration: {duration_min:.2f} min)")
except Exception as e:
logger.error(f"Error checking segment {segment_index} duration: {e}")
return tmp_path
async def merge_audio_files(audio_paths):
"""Merge multiple audio files into one file using binary concatenation"""
if not audio_paths:
return None
logger.info(f"Merging {len(audio_paths)} audio segments...")
# Create output file
with tempfile.NamedTemporaryFile(delete=False, suffix=".mp3") as tmp_file:
merged_path = tmp_file.name
# Binary concatenation of MP3 files (avoids WAV size limit)
total_size = 0
with open(merged_path, 'wb') as outfile:
for i, audio_path in enumerate(audio_paths):
try:
with open(audio_path, 'rb') as infile:
data = infile.read()
outfile.write(data)
total_size += len(data)
# Delete temporary segment file after merging
os.remove(audio_path)
logger.info(f"Merged and deleted segment {i+1}")
except Exception as e:
logger.error(f"Error merging segment {i+1}: {e}")
logger.info(f"Merged audio saved to {merged_path} (Total size: {total_size / 1024 / 1024:.2f} MB)")
return merged_path
async def text_to_speech_generator(text, voice, rate, volume, pitch, cleaning_options=None, output_format="mp3", output_filename=None):
"""Generate speech with detailed progress tracking via generator"""
if not text.strip():
yield None, "Please enter text to convert.", None
return
if not voice:
yield None, "Please select a voice.", None
return
# Apply text cleaning if enabled
if cleaning_options and cleaning_options.get('enable_cleaning', False):
yield 0, "Cleaning text...", None
# original_text = text # Unused
text = TextCleaner.clean_text(text, cleaning_options)
if cleaning_options.get('save_cleaned', False):
# Create a filename based on timestamp or first few words
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
filename = f"text_{timestamp}.txt"
saved_path = TextCleaner.save_cleaned_text(text, filename)
if saved_path:
logger.info(f"Saved cleaned text to {saved_path}")
if not text.strip():
yield None, "Text cleaning resulted in empty text.", None
return
voice_short_name = voice.split(" - ")[0]
rate_str = f"{rate:+d}%"
volume_str = f"{volume:+d}%"
pitch_str = f"{pitch:+d}Hz"
# Check if text is too long and needs segmentation
estimated_duration = estimate_text_duration(text)
yield 0, "Starting text processing...", None
logger.info(f"Starting TTS for text with estimated duration: {estimated_duration:.2f}m")
# Generate output filename with timestamp
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
if output_filename:
final_filename = f"{output_filename}_{timestamp}"
else:
final_filename = f"audio_{timestamp}"
final_audio_path = None
if estimated_duration > 15: # If longer than 15 minutes, split into segments
segments = split_text_by_paragraphs(text)
total_segments = len(segments)
segment_info = f"Text split into {total_segments} segments. Total estimated duration: {estimated_duration:.1f} min"
yield 5, segment_info, segment_info
if total_segments > 1:
# Generate audio for each segment with progress tracking
audio_objects = []
start_time = time.time()
for i, segment in enumerate(segments):
if segment.strip():
segment_duration = estimate_text_duration(segment)
progress = 10 + (80 * i / total_segments) # 10% to 90%
eta = calculate_eta(start_time, i, total_segments)
status_msg = (
f"Generating segment {i+1}/{total_segments}...\n"
f"Segment duration: {segment_duration:.1f} min\n"
f"ETA: {eta}"
)
logger.info(f"Progress: {status_msg.replace(chr(10), ', ')}")
yield progress, status_msg, segment_info
# Generate to memory
audio_obj = await generate_audio_segment(
segment, voice_short_name, rate_str, volume_str, pitch_str, i+1
)
audio_objects.append(audio_obj)
yield 90, "Merging audio files...", segment_info
# Merge all audio objects
merged_audio_path = await merge_audio_files(audio_objects)
final_audio_path = merged_audio_path
# Convert to M4B if requested
if output_format == "m4b" and merged_audio_path:
yield 95, "Converting to M4B format...", segment_info
m4b_path = await convert_to_m4b(merged_audio_path, final_filename)
if m4b_path:
os.remove(merged_audio_path)
final_audio_path = m4b_path
# Rename to final filename
if final_audio_path:
ext = ".m4b" if output_format == "m4b" else ".mp3"
new_path = os.path.join(os.path.dirname(final_audio_path), f"{final_filename}{ext}")
shutil.move(final_audio_path, new_path)
final_audio_path = new_path
yield 100, "Audio generation complete! ✅", segment_info
yield final_audio_path, "Done", segment_info
return
# For short texts or single segment, use original method
yield 50, "Generating audio...", None
logger.info("Generating single segment audio...")
communicate = edge_tts.Communicate(text, voice_short_name, rate=rate_str, volume=volume_str, pitch=pitch_str)
with tempfile.NamedTemporaryFile(delete=False, suffix=".mp3") as tmp_file:
tmp_path = tmp_file.name
await communicate.save(tmp_path)
final_audio_path = tmp_path
# Convert to M4B if requested
if output_format == "m4b":
yield 80, "Converting to M4B format...", None
m4b_path = await convert_to_m4b(tmp_path, final_filename)
if m4b_path:
os.remove(tmp_path)
final_audio_path = m4b_path
# Rename to final filename
if final_audio_path:
ext = ".m4b" if output_format == "m4b" else ".mp3"
new_path = os.path.join(os.path.dirname(final_audio_path), f"{final_filename}{ext}")
shutil.move(final_audio_path, new_path)
final_audio_path = new_path
logger.info(f"Audio generated at {final_audio_path}")
yield 100, "Audio generation complete! ✅", None
yield final_audio_path, "Done", None
async def tts_interface(text, uploaded_file, voice, rate, volume, pitch, output_format,
enable_cleaning, save_cleaned, clean_urls, clean_html,
clean_markdown, clean_ads, fix_enc, tidy_ws, del_gutenberg,
del_special, wetext_norm):
"""Enhanced TTS interface with detailed progress tracking"""
# Get output filename from uploaded file (if any)
output_filename = None
if uploaded_file is not None:
output_filename = os.path.splitext(os.path.basename(uploaded_file))[0]
logger.info(f"Using filename from uploaded file: {output_filename}")
if not text.strip():
yield None, gr.update(visible=True, value="Please enter text or upload a file."), "No text provided", gr.update(visible=False)
return
if not voice:
yield None, gr.update(visible=False), "Please select a voice.", gr.update(visible=False)
return
# Prepare cleaning options
cleaning_options = {
'enable_cleaning': enable_cleaning,
'save_cleaned': save_cleaned,
'remove_urls': clean_urls,
'remove_html': clean_html,
'remove_markdown': clean_markdown,
'filter_ads': clean_ads,
'fix_encoding': fix_enc,
'tidy_whitespace': tidy_ws,
'remove_gutenberg': del_gutenberg,
'remove_special_chars': del_special,
'wetext_normalization': wetext_norm
}
# We need to clean text here first to estimate duration correctly?
# Or let the generator handle it. The generator handles it, but estimation might be off.
# Ideally we clean first if enabled, then estimate.
working_text = text
if enable_cleaning:
working_text = TextCleaner.clean_text(text, cleaning_options)
if save_cleaned:
# We'll let the generator save it to avoid double saving or complex logic here,
# but we need to pass the options.
pass
estimated_duration = estimate_text_duration(working_text)
# Reset UI
yield None, gr.update(value="Starting...", visible=True), "Initializing...", gr.update(visible=False)
async for result in text_to_speech_generator(text, voice, rate, volume, pitch, cleaning_options, output_format, output_filename):
if isinstance(result, tuple) and len(result) == 3:
# Progress update
progress_val, status_msg, segment_info = result
if isinstance(progress_val, (int, float)):
# It's a progress update
segment_update = gr.update(value=segment_info, visible=True) if segment_info else gr.update(visible=False)
yield None, gr.update(value=status_msg, visible=True), status_msg, segment_update
else:
# It's the final result (path, msg, info)
audio_path = progress_val
yield audio_path, gr.update(value="Complete!", visible=True), "Generation Complete", gr.update(visible=True)
async def create_demo():
voices = await get_voices()
description = """
Convert text to speech using Microsoft Edge TTS. Adjust speech rate and pitch: 0 is default, positive values increase, negative values decrease.
🎥 **Exciting News: Introducing our Text-to-Video Converter!** 🎥
Take your content creation to the next level with our cutting-edge Text-to-Video Converter!
Transform your words into stunning, professional-quality videos in just a few clicks.
✨ Features:
• Convert text to engaging videos with customizable visuals
• Choose from 40+ languages and 300+ voices
• Perfect for creating audiobooks, storytelling, and language learning materials
• Ideal for educators, content creators, and language enthusiasts
📝 **Long Text Support**:
Texts longer than 15 minutes will be **automatically segmented** into smaller chunks for processing and then **merged back** into a single high-quality audio file. This ensures stability and allows for unlimited text length!
"""
default_voice = ""
for voice_key in voices.keys():
if "XiaoxiaoNeural" in voice_key:
default_voice = voice_key
break
with gr.Blocks(title="Edge TTS Text-to-Speech") as demo:
gr.Markdown("# Edge TTS Text-to-Speech")
gr.Markdown(description)
with gr.Row():
with gr.Column():
text_input = gr.Textbox(label="Input Text", lines=8, placeholder="Enter your text here... Long texts will be automatically segmented if they exceed 15 minutes of speech time.")
# File upload component
file_upload = gr.File(
label="Or Upload File (TXT/EPUB)",
file_types=[".txt", ".epub"],
type="filepath"
)
# Add text analysis info
text_info = gr.Markdown("**Text Analysis**: Enter text or upload a file to see estimated duration and segment count", visible=True)
with gr.Accordion("Text Cleaning Settings", open=True):
with gr.Row():
enable_cleaning = gr.Checkbox(label="Enable Text Cleaning", value=True)
save_cleaned = gr.Checkbox(label="Save Cleaned Text File", value=True)
with gr.Group(visible=True) as cleaning_options_group:
with gr.Row():
clean_urls = gr.Checkbox(label="Remove URLs", value=True)
clean_html = gr.Checkbox(label="Remove HTML", value=True)
with gr.Row():
clean_markdown = gr.Checkbox(label="Remove Markdown", value=True)
clean_ads = gr.Checkbox(label="Filter Ads", value=True)
with gr.Row():
fix_enc = gr.Checkbox(label="Fix Encoding", value=True)
tidy_ws = gr.Checkbox(label="Tidy Whitespace", value=True)
with gr.Row():
del_gutenberg = gr.Checkbox(label="Remove Project Gutenberg", value=True)
del_special = gr.Checkbox(label="Remove Special Characters", value=True)
with gr.Row():
wetext_norm = gr.Checkbox(label="Enable WeText Normalization", value=True)
def toggle_options(enabled):
return gr.update(visible=enabled)
enable_cleaning.change(fn=toggle_options, inputs=[enable_cleaning], outputs=[cleaning_options_group])
voice_dropdown = gr.Dropdown(choices=[""] + list(voices.keys()), label="Select Voice", value=default_voice)
with gr.Row():
rate_slider = gr.Slider(minimum=-50, maximum=50, value=0, label="Speech Rate (%)", step=1)
volume_slider = gr.Slider(minimum=-50, maximum=50, value=0, label="Speech Volume (%)", step=1)
pitch_slider = gr.Slider(minimum=-20, maximum=20, value=0, label="Pitch (Hz)", step=1)
# Output format selection
output_format = gr.Radio(
choices=["mp3", "m4b"],
value="mp3",
label="Output Format",
info="MP3 is default. M4B is audiobook format (requires ffmpeg)."
)
generate_btn = gr.Button("Generate Audio", variant="primary")
with gr.Column():
audio_output = gr.Audio(label="Generated Audio", type="filepath")
# Progress and status display
with gr.Group():
gr.Markdown("### 📊 Processing Progress")
progress_info = gr.Markdown("Ready, click Generate to start...", visible=True)
# Processing details
with gr.Accordion("🔍 Processing Details", open=True) as processing_details:
status_output = gr.Markdown("Waiting...", visible=True)
# Segment information display
with gr.Accordion("📋 Segment Information", open=True) as segment_info:
segment_details = gr.Markdown("Segment details will appear here for long texts", visible=True)
gr.Markdown("Experience the power of Edge TTS for text-to-speech conversion, and explore our advanced Text-to-Video Converter for even more creative possibilities!")
# Add text analysis function
def analyze_text(text, uploaded_file):
# If file is uploaded, parse it first
if uploaded_file is not None:
file_text, filename = parse_uploaded_file(uploaded_file)
if file_text:
text = file_text
else:
return f"**Text Analysis**: Failed to parse uploaded file"
if not text or not text.strip():
return "**Text Analysis**: Enter text or upload a file to see estimated duration and segment count"
duration = estimate_text_duration(text)
word_count = len(text.split())
char_count = len(text)
if duration > 15:
segments = split_text_by_paragraphs(text)
segment_count = len(segments)
return f"**Text Analysis**: {word_count} words, {char_count} characters, ~{duration:.1f} minutes speech time, {segment_count} segments will be generated"
else:
return f"**Text Analysis**: {word_count} words, {char_count} characters, ~{duration:.1f} minutes speech time"
# Handle file upload - show preview in text box
def on_file_upload(uploaded_file):
if uploaded_file is None:
return gr.update(), "**Text Analysis**: Enter text or upload a file to see estimated duration and segment count"
file_text, filename = parse_uploaded_file(uploaded_file)
if file_text:
# Calculate analysis
duration = estimate_text_duration(file_text)
word_count = len(file_text.split())
char_count = len(file_text)
if duration > 15:
segments = split_text_by_paragraphs(file_text)
segment_count = len(segments)
analysis = f"**Text Analysis**: {word_count} words, {char_count} characters, ~{duration:.1f} minutes speech time, {segment_count} segments will be generated"
else:
analysis = f"**Text Analysis**: {word_count} words, {char_count} characters, ~{duration:.1f} minutes speech time"
return gr.update(value=file_text), analysis
else:
return gr.update(), "**Text Analysis**: Failed to parse uploaded file"
# Update text analysis when text changes
text_input.change(
fn=analyze_text,
inputs=[text_input, file_upload],
outputs=[text_info]
)
# Update text box and analysis when file is uploaded
file_upload.change(
fn=on_file_upload,
inputs=[file_upload],
outputs=[text_input, text_info]
)
generate_btn.click(
fn=tts_interface,
inputs=[
text_input, file_upload, voice_dropdown, rate_slider, volume_slider, pitch_slider,
output_format, enable_cleaning, save_cleaned, clean_urls, clean_html,
clean_markdown, clean_ads, fix_enc, tidy_ws, del_gutenberg,
del_special, wetext_norm
],
outputs=[audio_output, progress_info, status_output, segment_details]
)
return demo
async def main():
demo = await create_demo()
demo.queue(default_concurrency_limit=5)
demo.launch(show_api=False)
if __name__ == "__main__":
asyncio.run(main())