| import spaces |
| import gradio as gr |
| import edge_tts |
| import asyncio |
| import tempfile |
| import os |
| import re |
| from pathlib import Path |
| from pydub import AudioSegment |
| import librosa |
| import soundfile as sf |
| import numpy as np |
| from pydub import AudioSegment |
| from pydub.playback import play |
| from scipy.signal import butter, lfilter |
|
|
|
|
| def apply_low_pass_filter(audio_segment, cutoff_freq, sample_rate, order=5): |
| """Applies a low-pass filter to a pydub AudioSegment.""" |
| audio_np = np.array(audio_segment.get_array_of_samples()).astype(np.float32) / (2**15 - 1) |
| if audio_segment.channels == 2: |
| audio_np = audio_np.reshape(-1, 2) |
|
|
| nyquist_freq = 0.5 * sample_rate |
| normalized_cutoff = cutoff_freq / nyquist_freq |
| b, a = butter(order, normalized_cutoff, btype='low', analog=False) |
|
|
| filtered_data = np.zeros_like(audio_np, dtype=np.float32) |
| if audio_segment.channels == 1: |
| filtered_data = lfilter(b, a, audio_np) |
| else: |
| for channel in range(audio_segment.channels): |
| filtered_data[:, channel] = lfilter(b, a, audio_np[:, channel]) |
|
|
| filtered_data_int16 = (filtered_data * (2**15 - 1)).astype(np.int16) |
| filtered_audio = AudioSegment(filtered_data_int16.tobytes(), |
| frame_rate=sample_rate, |
| sample_width=audio_segment.sample_width, |
| channels=audio_segment.channels) |
| return filtered_audio |
|
|
|
|
| def get_silence(duration_ms=1000): |
| |
| silent_audio = AudioSegment.silent( |
| duration=duration_ms, |
| frame_rate=24000 |
| ) |
| |
| silent_audio = silent_audio.set_channels(1) |
| silent_audio = silent_audio.set_sample_width(4) |
| with tempfile.NamedTemporaryFile(delete=False, suffix=".mp3") as tmp_file: |
| |
| silent_audio.export( |
| tmp_file.name, |
| format="mp3", |
| bitrate="48k", |
| parameters=[ |
| "-ac", "1", |
| "-ar", "24000", |
| "-sample_fmt", "s32", |
| "-codec:a", "libmp3lame" |
| ] |
| ) |
| return tmp_file.name |
|
|
| |
| async def get_voices(): |
| try: |
| voices = await edge_tts.list_voices() |
| return {f"{v['ShortName']} - {v['Locale']} ({v['Gender']})": v['ShortName'] for v in voices} |
| except Exception as e: |
| print(f"Error listing voices: {e}") |
| return {} |
|
|
| async def generate_audio_with_voice_prefix(text_segment, default_voice, rate, pitch, overall_target_duration_ms=None, speed_adjustment_factor=1.0): |
| """Generates audio for a text segment, handling voice prefixes and adjusting rate for duration.""" |
| current_voice_full = default_voice |
| current_voice_short = current_voice_full.split(" - ")[0] if current_voice_full else "" |
| current_rate = rate |
| current_pitch = pitch |
| processed_text = text_segment.strip() |
| |
| voice_map = { |
| "1F": "en-GB-SoniaNeural", |
| "2M": "en-GB-RyanNeural", |
| "3M": "en-US-BrianMultilingualNeural", |
| "2F": "en-US-JennyNeural", |
| "1M": "en-AU-WilliamNeural", |
| "3F": "en-HK-YanNeural", |
| "4M": "en-GB-ThomasNeural", |
| "4F": "en-US-EmmaNeural", |
| "1O": "en-GB-RyanNeural", |
| "1C": "en-GB-MaisieNeural", |
| "1V": "vi-VN-HoaiMyNeural", |
| "2V": "vi-VN-NamMinhNeural", |
| "3V": "vi-VN-HoaiMyNeural", |
| "4V": "vi-VN-NamMinhNeural", |
| } |
| detect = 0 |
| |
| for prefix, voice_short in voice_map.items(): |
| if processed_text.startswith(prefix): |
| current_voice_short = voice_short |
| if prefix in ["1F", "3F", "1V", "3V"]: |
| current_pitch = 0 |
| elif prefix in ["1O", "4V"]: |
| current_pitch = -20 |
| current_rate = -10 |
| detect = 1 |
| processed_text = processed_text[len(prefix):].strip() |
| break |
| |
| match = re.search(r"^(-?\d+)\s*(.*)", processed_text) |
| if match: |
| |
| number = match.group(1) |
| print(f"Prefix match found.") |
| current_pitch += int(number) |
| |
| |
| processed_text = match.group(2) |
| |
| |
|
|
| if processed_text: |
| rate_str = f"{current_rate:+d}%" |
| pitch_str = f"{current_pitch:+d}Hz" |
| print(f"Sending to Edge: '{processed_text}'") |
| try: |
| communicate = edge_tts.Communicate(processed_text, current_voice_short, rate=rate_str, pitch=pitch_str) |
| with tempfile.NamedTemporaryFile(delete=False, suffix=".mp3") as tmp_file: |
| audio_path = tmp_file.name |
| await communicate.save(audio_path) |
|
|
| if os.path.exists(audio_path): |
| audio = AudioSegment.from_mp3(audio_path) |
| |
| def detect_leading_silence(sound, silence_threshold=-50.0, chunk_size=10): |
| trim_ms = 0 |
| assert chunk_size > 0 |
| while sound[trim_ms:trim_ms+chunk_size].dBFS < silence_threshold and trim_ms < len(sound): |
| trim_ms += chunk_size |
| return trim_ms |
|
|
| start_trim = detect_leading_silence(audio) |
| end_trim = detect_leading_silence(audio.reverse()) |
| trimmed_audio = audio[start_trim:len(audio)-end_trim] |
| trimmed_audio.export(audio_path, format="mp3") |
| return audio_path |
|
|
| except Exception as e: |
| print(f"Edge TTS error processing '{processed_text}': {e}") |
| return None |
| return None |
|
|
| async def process_transcript_line(line, next_line_start_time, default_voice, rate, pitch, overall_duration_ms, speed_adjustment_factor): |
| """Processes a single transcript line with HH:MM:SS,milliseconds timestamp.""" |
| match = re.match(r'(\d{2}):(\d{2}):(\d{2}),(\d{3})\s+(.*)', line) |
| if match: |
| start_h, start_m, start_s, start_ms, text_parts = match.groups() |
| start_time_ms = ( |
| int(start_h) * 3600000 + |
| int(start_m) * 60000 + |
| int(start_s) * 1000 + |
| int(start_ms) |
| ) |
| audio_segments = [] |
| split_parts = re.split(r'[“”"]', text_parts) |
| process_next = False |
| for part in split_parts: |
| if part == '"': |
| process_next = not process_next |
| continue |
| if process_next and part.strip(): |
| audio_path = await generate_audio_with_voice_prefix(part, default_voice, rate, pitch, overall_duration_ms, speed_adjustment_factor) |
| if audio_path: |
| audio_segments.append(audio_path) |
| elif not process_next and part.strip(): |
| audio_path = await generate_audio_with_voice_prefix(part, default_voice, rate, pitch, overall_duration_ms, speed_adjustment_factor) |
| if audio_path: |
| audio_segments.append(audio_path) |
|
|
| if audio_segments: |
| combined_audio = AudioSegment.empty() |
| for segment_path in audio_segments: |
| try: |
| segment = AudioSegment.from_mp3(segment_path) |
| combined_audio += segment |
| os.remove(segment_path) |
| except Exception as e: |
| print(f"Error loading or combining audio segment {segment_path}: {e}") |
| return None, None, None |
|
|
| combined_audio_path = f"combined_audio_{start_time_ms}.mp3" |
| try: |
| combined_audio.export(combined_audio_path, format="mp3") |
| return start_time_ms, [combined_audio_path], overall_duration_ms |
| except Exception as e: |
| print(f"Error exporting combined audio: {e}") |
| return None, None, None |
|
|
| return start_time_ms, [], overall_duration_ms |
|
|
| return None, None, None |
|
|
| async def transcript_to_speech(transcript_text, voice, rate, pitch, speed_adjustment_factor): |
| if not transcript_text.strip(): |
| return None, gr.Warning("Please enter transcript text.") |
| if not voice: |
| return None, gr.Warning("Please select a voice.") |
| lines = transcript_text.strip().split('\n') |
| timed_audio_segments = [] |
| max_end_time_ms = 0 |
|
|
| for i, line in enumerate(lines): |
| next_line_start_time = None |
| if i < len(lines) - 1: |
| next_line_match = re.match(r'(\d{2}):(\d{2}):(\d{2}),(\d{3})\s+.*', lines[i+1]) |
| if next_line_match: |
| nh, nm, ns, nms = next_line_match.groups() |
| next_line_start_time = ( |
| int(nh) * 3600000 + |
| int(nm) * 60000 + |
| int(ns) * 1000 + |
| int(nms) |
| ) |
|
|
| current_line_match = re.match(r'(\d{2}):(\d{2}):(\d{2}),(\d{3})\s+(.*)', line) |
| if current_line_match: |
| sh, sm, ss, sms, text_content = current_line_match.groups() |
| start_time_ms = ( |
| int(sh) * 3600000 + |
| int(sm) * 60000 + |
| int(ss) * 1000 + |
| int(sms) |
| ) |
| overall_duration_ms = None |
| if next_line_start_time is not None: |
| overall_duration_ms = next_line_start_time - start_time_ms |
|
|
| start_time, audio_paths, duration = await process_transcript_line(line, next_line_start_time, voice, rate, pitch, overall_duration_ms, speed_adjustment_factor) |
|
|
| if start_time is not None and audio_paths: |
| combined_line_audio = AudioSegment.empty() |
| total_generated_duration_ms = 0 |
| for path in audio_paths: |
| if path: |
| try: |
| audio = AudioSegment.from_mp3(path) |
| combined_line_audio += audio |
| total_generated_duration_ms += len(audio) |
| os.remove(path) |
| except FileNotFoundError: |
| print(f"Warning: Audio file not found: {path}") |
|
|
| if combined_line_audio and overall_duration_ms is not None and overall_duration_ms > 0 and total_generated_duration_ms > overall_duration_ms: |
| speed_factor = (total_generated_duration_ms / overall_duration_ms) * speed_adjustment_factor |
| if speed_factor > 0: |
| if speed_factor < 1.0: |
| speed_factor = 1.0 |
| combined_line_audio = combined_line_audio.speedup(playback_speed=speed_factor) |
|
|
| if combined_line_audio: |
| timed_audio_segments.append({'start': start_time, 'audio': combined_line_audio}) |
| max_end_time_ms = max(max_end_time_ms, start_time + len(combined_line_audio)) |
|
|
| elif audio_paths: |
| for path in audio_paths: |
| if path: |
| try: |
| os.remove(path) |
| except FileNotFoundError: |
| pass |
|
|
| if not timed_audio_segments: |
| return None, "No processable audio segments found." |
|
|
| final_audio = AudioSegment.silent(duration=max_end_time_ms, frame_rate=24000) |
| for segment in timed_audio_segments: |
| final_audio = final_audio.overlay(segment['audio'], position=segment['start']) |
| |
| |
| cutoff_frequency = 3500 |
| filtered_final_audio = apply_low_pass_filter(final_audio, cutoff_frequency, final_audio.frame_rate) |
|
|
| combined_audio_path = tempfile.mktemp(suffix=".mp3") |
| |
| filtered_final_audio.export(combined_audio_path, format="mp3") |
| return combined_audio_path, None |
|
|
| @spaces.GPU |
| def tts_interface(transcript, voice, rate, pitch, speed_adjustment_factor): |
| audio, warning = asyncio.run(transcript_to_speech(transcript, voice, rate, pitch, speed_adjustment_factor)) |
| return audio, warning |
|
|
| async def create_demo(): |
| voices = await get_voices() |
| default_voice = "en-US-AndrewMultilingualNeural - en-US (Male)" |
| description = """ |
| Process timestamped text (HH:MM:SS,milliseconds) with voice changes within quotes. |
| The duration for each line is determined by the timestamp of the following line. |
| The speed of the ENTIRE generated audio for a line will be adjusted to fit within this duration. |
| If there is no subsequent timestamp, the speed adjustment will be skipped. |
| You can control the intensity of the speed adjustment using the "Speed Adjustment Factor" slider. |
| Format: `HH:MM:SS,milliseconds "VoicePrefix Text" more text "AnotherVoicePrefix More Text"` |
| Example: |
| ``` |
| 00:00:00,000 "This is the default voice." more default. "1F Now a female voice." and back to default. |
| 00:00:05,500 "1C Yes," said the child, "it is fun!" |
| ``` |
| *************************************************************************************************** |
| 1M = en-AU-WilliamNeural - en-AU (Male) |
| 1F = en-GB-SoniaNeural - en-GB (Female) |
| 2M = en-GB-RyanNeural - en-GB (Male) |
| 2F = en-US-JennyNeural - en-US (Female) |
| 3M = en-US-BrianMultilingualNeural - en-US (Male) |
| 3F = en-HK-YanNeural - en-HK (Female) |
| 4M = en-GB-ThomasNeural - en-GB (Male) |
| 4F = en-US-EmmaNeural - en-US (Female) |
| 1O = en-GB-RyanNeural - en-GB (Male) # Old Man |
| 1C = en-GB-MaisieNeural - en-GB (Female) # Child |
| 1V = vi-VN-HoaiMyNeural - vi-VN (Female) # Vietnamese (Female) |
| 2V = vi-VN-NamMinhNeural - vi-VN (Male) # Vietnamese (Male) |
| 3V = vi-VN-HoaiMyNeural - vi-VN (Female) # Vietnamese (Female) |
| 4V = vi-VN-NamMinhNeural - vi-VN (Male) # Vietnamese (Male) |
| **************************************************************************************************** |
| """ |
| demo = gr.Interface( |
| fn=tts_interface, |
| inputs=[ |
| gr.Textbox(label="Timestamped Text with Voice Changes and Duration", lines=10, placeholder='00:00:00,000 "Text" more text "1F Different Voice"'), |
| gr.Dropdown(choices=[""] + list(voices.keys()), label="Select Default Voice", value=default_voice), |
| gr.Slider(minimum=-50, maximum=50, value=0, label="Speech Rate Adjustment (%)", step=1), |
| gr.Slider(minimum=-50, maximum=50, value=0, label="Pitch Adjustment (Hz)", step=1), |
| gr.Slider(minimum=0.5, maximum=1.5, value=1.0, step=0.05, label="Speed Adjustment Factor") |
| ], |
| outputs=[ |
| gr.Audio(label="Generated Audio", type="filepath"), |
| gr.Markdown(label="Warning", visible=False) |
| ], |
| title="TTS with Line-Wide Duration Adjustment and In-Quote Voice Switching", |
| description=description, |
| analytics_enabled=False, |
| allow_flagging=False |
| ) |
| return demo |
|
|
| if __name__ == "__main__": |
| demo = asyncio.run(create_demo()) |
| demo.launch() |
|
|