| |
| |
| """ |
| Spark-TTS by SparkAudio – Enhanced eBook Converter |
| Licensed under the Apache License, Version 2.0. |
| (See accompanying LICENSE file for details) |
| """ |
|
|
| import os |
| import torch |
| import soundfile as sf |
| import logging |
| import argparse |
| import platform |
| import subprocess |
| from datetime import datetime |
|
|
| import gradio as gr |
|
|
| |
| import re |
| import ebooklib |
| from ebooklib import epub |
| from bs4 import BeautifulSoup |
| import nltk |
| from nltk.tokenize import sent_tokenize |
|
|
| |
| from pydub import AudioSegment |
|
|
| |
| from tqdm import tqdm |
|
|
| os.environ["HF_HUB_ENABLE_HF_TRANSFER"] = "0" |
|
|
| |
| nltk.download('punkt') |
| nltk.download('punkt_tab') |
|
|
| |
| try: |
| from huggingface_hub import snapshot_download |
| if not os.path.exists("pretrained_models/Spark-TTS-0.5B"): |
| print("Downloading pretrained model from Hugging Face...") |
| snapshot_download("SparkAudio/Spark-TTS-0.5B", local_dir="pretrained_models/Spark-TTS-0.5B") |
| except ImportError: |
| print("huggingface_hub is not installed. Make sure the pretrained model is already available.") |
|
|
|
|
| |
| |
| |
|
|
| from cli.SparkTTS import SparkTTS |
| from sparktts.utils.token_parser import LEVELS_MAP_UI |
|
|
| def initialize_model(model_dir="pretrained_models/Spark-TTS-0.5B", device=0): |
| """Load the Spark-TTS model once at startup.""" |
| logging.info(f"Loading model from: {model_dir}") |
| if platform.system() == "Darwin": |
| device = torch.device("cpu") |
| logging.info("GPU acceleration not available, using CPU") |
| elif torch.cuda.is_available(): |
| device = torch.device(f"cuda:{device}") |
| logging.info(f"Using CUDA device: {device}") |
| else: |
| device = torch.device("cpu") |
| logging.info("GPU acceleration not available, using CPU") |
| model = SparkTTS(model_dir, device) |
| return model |
|
|
| def run_tts(text, model, prompt_text=None, prompt_speech=None, gender=None, pitch=None, speed=None, save_dir="results"): |
| """Perform TTS inference and save the generated audio fragment. |
| Returns the full path of the saved .wav file.""" |
| logging.info(f"Saving audio to: {save_dir}") |
| if prompt_text is not None and len(prompt_text) < 2: |
| prompt_text = None |
| os.makedirs(save_dir, exist_ok=True) |
| timestamp = datetime.now().strftime("%Y%m%d%H%M%S%f") |
| save_path = os.path.join(save_dir, f"{timestamp}.wav") |
| logging.info("Starting inference...") |
| with torch.no_grad(): |
| wav = model.inference( |
| text, |
| prompt_speech, |
| prompt_text, |
| gender, |
| pitch, |
| speed, |
| ) |
| sf.write(save_path, wav, samplerate=16000) |
| logging.info(f"Audio saved at: {save_path}") |
| return save_path |
|
|
|
|
| |
| |
| |
|
|
| def ensure_directory(directory_path): |
| if not os.path.exists(directory_path): |
| os.makedirs(directory_path) |
|
|
| def convert_to_epub(input_path, output_path): |
| """Convert an eBook (mobi/pdf/etc.) to EPUB using Calibre's ebook-convert.""" |
| try: |
| subprocess.run(['ebook-convert', input_path, output_path], check=True) |
| return True |
| except subprocess.CalledProcessError as e: |
| logging.error(f"ebook-convert failed: {e}") |
| return False |
|
|
| def save_chapters_as_text(epub_path, chapters_dir): |
| """Extract HTML documents from the EPUB and save each as a text file (one per chapter).""" |
| book = epub.read_epub(epub_path) |
| chapter_counter = 0 |
| for item in book.get_items(): |
| if item.get_type() == ebooklib.ITEM_DOCUMENT: |
| soup = BeautifulSoup(item.get_content(), 'html.parser') |
| text = soup.get_text() |
| if text.strip(): |
| chapter_file = os.path.join(chapters_dir, f"chapter_{chapter_counter}.txt") |
| with open(chapter_file, 'w', encoding='utf-8') as f: |
| f.write(text) |
| chapter_counter += 1 |
| return chapter_counter |
|
|
| def create_chapter_labeled_book(ebook_file_path): |
| """Convert the uploaded eBook into chapters saved as text files.""" |
| working_dir = os.path.join(".", "Working_files") |
| ensure_directory(working_dir) |
| temp_epub = os.path.join(working_dir, "temp.epub") |
| chapters_dir = os.path.join(working_dir, "chapters") |
| ensure_directory(chapters_dir) |
| if os.path.exists(temp_epub): |
| os.remove(temp_epub) |
| if convert_to_epub(ebook_file_path, temp_epub): |
| num_chapters = save_chapters_as_text(temp_epub, chapters_dir) |
| logging.info(f"Extracted {num_chapters} chapters.") |
| return chapters_dir |
| else: |
| raise Exception("Failed to convert ebook to EPUB.") |
|
|
| def split_long_sentence(sentence, max_length=250): |
| """Split a long sentence into smaller fragments at the last space before max_length.""" |
| parts = [] |
| while len(sentence) > max_length: |
| split_at = sentence.rfind(' ', 0, max_length) |
| if split_at == -1: |
| split_at = max_length |
| parts.append(sentence[:split_at].strip()) |
| sentence = sentence[split_at:].strip() |
| parts.append(sentence) |
| return parts |
|
|
| def combine_wav_files(file_list, output_file): |
| """Combine a list of WAV files into one WAV file.""" |
| combined = AudioSegment.empty() |
| for f in file_list: |
| seg = AudioSegment.from_wav(f) |
| combined += seg |
| combined.export(output_file, format="wav") |
|
|
| def convert_ebook_to_audiobook(ebook_file_path, model, gender=None, pitch=None, speed=None, prompt_text=None, prompt_speech=None): |
| """Convert an entire eBook into an audiobook WAV file. |
| Processes chapters, splits sentences, runs TTS for each fragment, |
| and combines all fragments with brief silences between chapters.""" |
| |
| chapters_dir = create_chapter_labeled_book(ebook_file_path) |
| chapter_files = sorted( |
| [os.path.join(chapters_dir, f) for f in os.listdir(chapters_dir) if f.startswith("chapter_") and f.endswith(".txt")], |
| key=lambda x: int(re.findall(r'\d+', os.path.basename(x))[0]) |
| ) |
| output_dir = os.path.join(".", "Audiobooks") |
| ensure_directory(output_dir) |
| chapter_audio_files = [] |
| temp_audio_dir = os.path.join(".", "Working_files", "temp_audio") |
| ensure_directory(temp_audio_dir) |
|
|
| |
| for chapter_file in tqdm(chapter_files, desc="Processing Chapters"): |
| with open(chapter_file, 'r', encoding='utf-8') as f: |
| text = f.read() |
| sentences = sent_tokenize(text) |
| fragment_audio_files = [] |
| counter = 0 |
| |
| for sentence in tqdm(sentences, desc=f"Processing {os.path.basename(chapter_file)}", leave=False): |
| fragments = split_long_sentence(sentence) |
| for frag in fragments: |
| if frag: |
| frag_wav = run_tts(frag, model, prompt_text=prompt_text, prompt_speech=prompt_speech, |
| gender=gender, pitch=pitch, speed=speed, save_dir=temp_audio_dir) |
| new_frag_wav = os.path.join(temp_audio_dir, f"{os.path.basename(chapter_file)}_{counter}.wav") |
| os.rename(frag_wav, new_frag_wav) |
| fragment_audio_files.append(new_frag_wav) |
| counter += 1 |
| chapter_audio = os.path.join(temp_audio_dir, f"{os.path.basename(chapter_file)}_combined.wav") |
| combine_wav_files(fragment_audio_files, chapter_audio) |
| chapter_audio_files.append(chapter_audio) |
|
|
| silence = AudioSegment.silent(duration=2000) |
| final_audio = AudioSegment.empty() |
| for f in chapter_audio_files: |
| seg = AudioSegment.from_wav(f) |
| final_audio += seg + silence |
| final_output = os.path.join(output_dir, os.path.splitext(os.path.basename(ebook_file_path))[0] + ".wav") |
| final_audio.export(final_output, format="wav") |
| return final_output |
|
|
|
|
| |
| |
| |
|
|
| def build_ui(model_dir, device=0): |
| |
| model = initialize_model(model_dir, device=device) |
|
|
| |
| def voice_clone(text, prompt_text, prompt_wav_upload, prompt_wav_record): |
| |
| |
| if prompt_wav_upload or prompt_wav_record: |
| prompt_speech = prompt_wav_upload if prompt_wav_upload else prompt_wav_record |
| gender_used = None |
| else: |
| prompt_speech = None |
| gender_used = "male" |
| return run_tts(text, model, prompt_text=prompt_text, prompt_speech=prompt_speech, gender=gender_used) |
|
|
| |
| def voice_creation(text, gender, pitch, speed): |
| pitch_val = LEVELS_MAP_UI[int(pitch)] |
| speed_val = LEVELS_MAP_UI[int(speed)] |
| return run_tts(text, model, gender=gender, pitch=pitch_val, speed=speed_val) |
|
|
| |
| def ebook_conversion(ebook_file, gender, pitch, speed, prompt_text, prompt_wav_upload, prompt_wav_record): |
| |
| if prompt_wav_upload or prompt_wav_record: |
| prompt_speech = prompt_wav_upload if prompt_wav_upload else prompt_wav_record |
| gender_used = None |
| pitch_val = None |
| speed_val = None |
| else: |
| prompt_speech = None |
| gender_used = gender |
| pitch_val = LEVELS_MAP_UI[int(pitch)] |
| speed_val = LEVELS_MAP_UI[int(speed)] |
| ebook_file_path = ebook_file.name if hasattr(ebook_file, "name") else ebook_file |
| return convert_ebook_to_audiobook( |
| ebook_file_path, model, |
| gender=gender_used, pitch=pitch_val, speed=speed_val, |
| prompt_text=prompt_text, prompt_speech=prompt_speech |
| ) |
|
|
| |
| with gr.Blocks() as demo: |
| gr.HTML('<h1 style="text-align: center;">Spark-TTS by SparkAudio – Enhanced eBook Converter</h1>') |
| with gr.Tabs(): |
| |
| with gr.TabItem("Voice Clone"): |
| gr.Markdown("### Upload reference audio or record a prompt") |
| with gr.Row(): |
| prompt_wav_upload = gr.Audio(sources="upload", type="filepath", |
| label="Upload Prompt Audio (>=16kHz)") |
| prompt_wav_record = gr.Audio(sources="microphone", type="filepath", |
| label="Record Prompt Audio") |
| with gr.Row(): |
| text_input = gr.Textbox(label="Text", lines=3, placeholder="Enter text") |
| prompt_text_input = gr.Textbox(label="Prompt Text (Optional)", lines=3, |
| placeholder="Enter prompt text") |
| audio_output_clone = gr.Audio(label="Generated Audio", autoplay=True, streaming=True) |
| btn_clone = gr.Button("Generate Voice Clone") |
| btn_clone.click( |
| voice_clone, |
| inputs=[text_input, prompt_text_input, prompt_wav_upload, prompt_wav_record], |
| outputs=audio_output_clone |
| ) |
| |
| with gr.TabItem("Voice Creation"): |
| gr.Markdown("### Create a custom voice") |
| with gr.Row(): |
| gender = gr.Radio(choices=["male", "female"], value="male", label="Gender") |
| pitch = gr.Slider(minimum=1, maximum=5, step=1, value=3, label="Pitch") |
| speed = gr.Slider(minimum=1, maximum=5, step=1, value=3, label="Speed") |
| text_input_creation = gr.Textbox(label="Input Text", lines=3, |
| placeholder="Enter text", |
| value="Generate custom voice sample.") |
| audio_output_creation = gr.Audio(label="Generated Audio", autoplay=True, streaming=True) |
| btn_create = gr.Button("Create Voice") |
| btn_create.click( |
| voice_creation, |
| inputs=[text_input_creation, gender, pitch, speed], |
| outputs=audio_output_creation |
| ) |
| |
| with gr.TabItem("eBook Conversion"): |
| gr.Markdown("### Convert an eBook into an Audiobook") |
| ebook_file = gr.File(label="Upload eBook File (e.g., epub, mobi, pdf, txt)", |
| file_types=[".epub", ".mobi", ".pdf", ".txt"]) |
| with gr.Row(): |
| gender_ebook = gr.Radio(choices=["male", "female"], value="male", label="Gender") |
| pitch_ebook = gr.Slider(minimum=1, maximum=5, step=1, value=3, label="Pitch") |
| speed_ebook = gr.Slider(minimum=1, maximum=5, step=1, value=3, label="Speed") |
| prompt_text_ebook = gr.Textbox(label="Prompt Text (Optional)", lines=3, |
| placeholder="Enter prompt text for voice cloning") |
| with gr.Row(): |
| prompt_wav_upload_ebook = gr.Audio(sources="upload", type="filepath", |
| label="Upload Prompt Audio (>=16kHz)") |
| prompt_wav_record_ebook = gr.Audio(sources="microphone", type="filepath", |
| label="Record Prompt Audio") |
| audio_output_ebook = gr.Audio(label="Generated Audiobook", autoplay=True, streaming=True) |
| btn_ebook = gr.Button("Convert eBook") |
| btn_ebook.click( |
| ebook_conversion, |
| inputs=[ebook_file, gender_ebook, pitch_ebook, speed_ebook, prompt_text_ebook, |
| prompt_wav_upload_ebook, prompt_wav_record_ebook], |
| outputs=audio_output_ebook |
| ) |
| return demo |
|
|
| def parse_arguments(): |
| parser = argparse.ArgumentParser(description="Spark-TTS eBook Converter") |
| parser.add_argument("--model_dir", type=str, default="pretrained_models/Spark-TTS-0.5B", |
| help="Path to the model directory.") |
| parser.add_argument("--device", type=int, default=0, help="GPU device id") |
| parser.add_argument("--server_name", type=str, default="0.0.0.0", help="Server host") |
| parser.add_argument("--server_port", type=int, default=7860, help="Server port") |
| return parser.parse_args() |
|
|
| if __name__ == "__main__": |
| args = parse_arguments() |
| demo = build_ui(args.model_dir, args.device) |
| demo.launch(server_name=args.server_name, server_port=args.server_port) |