Spaces:
Runtime error
Runtime error
| import os | |
| import re | |
| import sys | |
| import uuid | |
| import zipfile | |
| import shutil | |
| import random | |
| import nltk | |
| import gradio as gr | |
| from g2p_en import G2p | |
| from pydub import AudioSegment | |
| # Pre-download required NLTK data | |
| nltk.download('averaged_perceptron_tagger') | |
| nltk.download('averaged_perceptron_tagger_eng', quiet=True) | |
| BASE_CHARACTERS_DIR = os.path.join("assets", "characters") | |
| os.makedirs(BASE_CHARACTERS_DIR, exist_ok=True) | |
| class TextToSpeech: | |
| PHONEME_MAPPING = { | |
| 'AW': ['AE', 'OW'], 'DH': ['D'], 'EY': ['EH', 'IY'], 'JH': ['CH'], | |
| 'SH': ['CH'], 'TH': ['D'], 'ZH': ['CH'], 'AE': ['AA'], | |
| 'AO': ['AA', 'OW'], 'ER': ['AA'], 'IH': ['IY'], | |
| 'OY': ['OW', 'Y', 'IY'], 'UH': ['UW'], 'AH': ['AA'] | |
| } | |
| def __init__(self, character_folder): | |
| self.character_folder = character_folder | |
| self.g2p = G2p() | |
| self.word_pause_ms = 1 # 0.001 seconds -> 1 ms | |
| self.fade_duration_ms = 10 # 0.010 seconds -> 10 ms | |
| # Target specs | |
| self.target_channels = 1 | |
| self.target_rate = 44100 | |
| def _pick_random_variant(self, base_path): | |
| directory = os.path.dirname(base_path) | |
| base_name = os.path.splitext(os.path.basename(base_path))[0] | |
| if not os.path.isdir(directory): | |
| return None | |
| pattern = re.compile(rf"^{re.escape(base_name)}(_\d+)?\.wav$", re.IGNORECASE) | |
| candidates = [os.path.join(directory, f) for f in os.listdir(directory) if pattern.match(f)] | |
| return random.choice(candidates) if candidates else None | |
| def _normalize_audio(self, filepath): | |
| try: | |
| audio = AudioSegment.from_wav(filepath) | |
| if audio.channels > 1: | |
| audio = audio.set_channels(self.target_channels) | |
| if audio.frame_rate != self.target_rate: | |
| audio = audio.set_frame_rate(self.target_rate) | |
| return audio | |
| except Exception: | |
| return None | |
| def _get_phoneme_data(self, phoneme): | |
| if phoneme == "AH0": | |
| chosen_fallback = random.choice(["AA", "AH"]) | |
| return self._get_phoneme_data(chosen_fallback) | |
| base = os.path.join(self.character_folder, f"{phoneme}.wav") | |
| path = self._pick_random_variant(base) | |
| if path: | |
| return self._normalize_audio(path) | |
| if phoneme in self.PHONEME_MAPPING: | |
| combined_audio = None | |
| for sub_p in self.PHONEME_MAPPING[phoneme]: | |
| sub_audio = self._get_phoneme_data(sub_p) | |
| if sub_audio: | |
| if combined_audio: | |
| combined_audio = combined_audio.append(sub_audio, crossfade=min(self.fade_duration_ms, len(combined_audio), len(sub_audio))) | |
| else: | |
| combined_audio = sub_audio | |
| return combined_audio | |
| return None | |
| def generate_audio_data(self, str_input): | |
| tokens = re.findall(r"[\w']+|[.,!?;]", str_input) | |
| raw_segments = [] | |
| for token in tokens: | |
| if token in [".", "!", "?", ",", ";"]: | |
| dur_ms = 400 if token in [".", "!", "?"] else 220 | |
| raw_segments.append({"audio": AudioSegment.silent(duration=dur_ms), "is_pause": True}) | |
| continue | |
| word_wav = self._pick_random_variant(os.path.join(self.character_folder, "words", f"{token.upper()}.wav")) | |
| if word_wav: | |
| norm_word = self._normalize_audio(word_wav) | |
| if norm_word: | |
| raw_segments.append({"audio": norm_word, "is_pause": False}) | |
| else: | |
| phonemes = self.g2p(token) | |
| valid_ps = [re.sub(r'\d+', '', p) if p != "AH0" else p for p in phonemes] | |
| valid_ps = [p for p in valid_ps if re.match(r'[A-Z]+[0-9]*', p)] | |
| if valid_ps and valid_ps[-1] in ["AH", "AE", "AH0"]: | |
| valid_ps[-1] = random.choice(["AA", "AH"]) | |
| for p_clean in valid_ps: | |
| seg_audio = self._get_phoneme_data(p_clean) | |
| if seg_audio: | |
| raw_segments.append({"audio": seg_audio, "is_pause": False}) | |
| raw_segments.append({"audio": AudioSegment.silent(duration=self.word_pause_ms), "is_pause": True}) | |
| if not raw_segments: | |
| return AudioSegment.silent(duration=100) | |
| final_audio = None | |
| for i in range(len(raw_segments)): | |
| curr_audio = raw_segments[i]["audio"] | |
| if final_audio is None: | |
| final_audio = curr_audio | |
| continue | |
| # Apply crossfade if neither side is a pause segment | |
| if not raw_segments[i-1]["is_pause"] and not raw_segments[i]["is_pause"]: | |
| fade_size = min(self.fade_duration_ms, len(final_audio), len(curr_audio)) | |
| if fade_size > 0: | |
| final_audio = final_audio.append(curr_audio, crossfade=fade_size) | |
| else: | |
| final_audio += curr_audio | |
| else: | |
| final_audio += curr_audio | |
| return final_audio | |
| def render_to_file(self, str_input, output_path): | |
| audio_segment = self.generate_audio_data(str_input) | |
| audio_segment.export(output_path, format="wav") | |
| # --- Helper functions for Managing Categories & ZIP uploads --- | |
| def get_hierarchy(): | |
| """Scans the assets directory and returns structural mapping.""" | |
| categories = {} | |
| if not os.path.isdir(BASE_CHARACTERS_DIR): | |
| return categories | |
| for cat in sorted(os.listdir(BASE_CHARACTERS_DIR)): | |
| cat_p = os.path.join(BASE_CHARACTERS_DIR, cat) | |
| if os.path.isdir(cat_p): | |
| chars = [c for c in os.listdir(cat_p) if os.path.isdir(os.path.join(cat_p, c))] | |
| if chars: | |
| categories[cat] = sorted(chars) | |
| return categories | |
| def handle_zip_upload(file_obj): | |
| """Unpacks zipped voice lines into the expected directory schema.""" | |
| if file_obj is None: | |
| return gr.update(), gr.update(), "No file uploaded." | |
| try: | |
| temp_extract = os.path.join("assets", f"temp_{uuid.uuid4().hex[:6]}") | |
| with zipfile.ZipFile(file_obj.name, 'r') as zip_ref: | |
| zip_ref.extractall(temp_extract) | |
| # Figure out internal structure and migrate valid directories | |
| for root, dirs, files in os.walk(temp_extract): | |
| # If directory contains wav files directly, treat it as a character folder | |
| if any(f.lower().endswith('.wav') for f in files): | |
| char_name = os.path.basename(root) | |
| parent_name = os.path.basename(os.path.dirname(root)) | |
| # If parent folder is just the root temp extraction layout, assign a generic Category | |
| category_name = parent_name if parent_name != os.path.basename(temp_extract) else "Uploaded" | |
| dest_dir = os.path.join(BASE_CHARACTERS_DIR, category_name, char_name) | |
| os.makedirs(os.path.dirname(dest_dir), exist_ok=True) | |
| if os.path.exists(dest_dir): | |
| shutil.rmtree(dest_dir) | |
| shutil.copytree(root, dest_dir) | |
| shutil.rmtree(temp_extract) | |
| # Refresh configuration selections | |
| hierarchy = get_hierarchy() | |
| cats = list(hierarchy.keys()) | |
| default_cat = cats[0] if cats else None | |
| default_chars = hierarchy[default_cat] if default_cat else [] | |
| return ( | |
| gr.update(choices=cats, value=default_cat), | |
| gr.update(choices=default_chars, value=default_chars[0] if default_chars else None), | |
| "Voice pack uploaded and cataloged successfully!" | |
| ) | |
| except Exception as e: | |
| return gr.update(), gr.update(), f"Error processing file: {str(e)}" | |
| def update_characters(category): | |
| hierarchy = get_hierarchy() | |
| chars = hierarchy.get(category, []) | |
| return gr.update(choices=chars, value=chars[0] if chars else None) | |
| def update_profile_preview(category, character): | |
| if not category or not character: | |
| return None | |
| profile_path = os.path.join(BASE_CHARACTERS_DIR, category, character, "profile.png") | |
| if os.path.exists(profile_path): | |
| return profile_path | |
| return None | |
| def synthesize(category, character, text): | |
| if not category or not character: | |
| raise gr.Error("Please ensure a valid Category and Character are active.") | |
| if not text.strip(): | |
| raise gr.Error("Text field cannot be left blank.") | |
| char_path = os.path.join(BASE_CHARACTERS_DIR, category, character) | |
| tts = TextToSpeech(char_path) | |
| out_filename = f"output_{uuid.uuid4().hex[:8]}.wav" | |
| tts.render_to_file(text, out_filename) | |
| return out_filename | |
| # --- Gradio UI Block Setup --- | |
| initial_hierarchy = get_hierarchy() | |
| initial_cats = list(initial_hierarchy.keys()) | |
| initial_chars = initial_hierarchy[initial_cats[0]] if initial_cats else [] | |
| with gr.Blocks(theme=gr.themes.Soft(primary_hue="amber", neutral_hue="slate")) as demo: | |
| gr.Markdown("# 🎙️ Sentence Mixing TTS Generator") | |
| gr.Markdown("An elegant web interface for sentence-mixing speech generation. Upload voice line assets or choose a character configuration to begin.") | |
| with gr.Row(): | |
| with gr.Column(scale=1): | |
| profile_preview = gr.Image( | |
| value=update_profile_preview(initial_cats[0], initial_chars[0]) if initial_chars else None, | |
| label="Character Profile", | |
| height=220, | |
| width=220, | |
| interactive=False, | |
| circle=True | |
| ) | |
| category_drop = gr.Dropdown(choices=initial_cats, value=initial_cats[0] if initial_cats else None, label="Voice Category") | |
| character_drop = gr.Dropdown(choices=initial_chars, value=initial_chars[0] if initial_chars else None, label="Character") | |
| category_drop.change(update_characters, inputs=category_drop, outputs=character_drop) | |
| character_drop.change(update_profile_preview, inputs=[category_drop, character_drop], outputs=profile_preview) | |
| with gr.Column(scale=2): | |
| input_text = gr.Textbox(label="Text to Synthesize", lines=6, placeholder="Type your text sentence here...") | |
| submit_btn = gr.Button("📢 Speak / Generate", variant="primary") | |
| audio_output = gr.Audio(label="Synthesized Audio Output", type="filepath") | |
| submit_btn.click(synthesize, inputs=[category_drop, character_drop, input_text], outputs=audio_output) | |
| with gr.Accordion("⚙️ Upload New Voice Assets (.zip)", open=False): | |
| gr.Markdown(""" | |
| ### Expected `.zip` Internal Structure | |
| You can pack folders into your zip file. For example: | |
| * `MyCharacter/AA.wav`, `MyCharacter/B.wav`, etc. | |
| * `MyCharacter/words/HELLO.wav` (Optional) | |
| * `MyCharacter/profile.png` (Optional round-cropped display icon) | |
| """) | |
| zip_uploader = gr.File(label="Choose Voice Zip File", file_types=[".zip"]) | |
| upload_status = gr.Markdown(value="Waiting for file upload...") | |
| upload_btn = gr.Button("📦 Unpack & Register Voice Pack") | |
| upload_btn.click( | |
| handle_zip_upload, | |
| inputs=zip_uploader, | |
| outputs=[category_drop, character_drop, upload_status] | |
| ) | |
| if __name__ == "__main__": | |
| demo.launch() | |