|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
import gradio as gr |
|
|
from config import VOICE_MODE_CLONE, MAXIMUM_VOICE_CLONE_FILE_SIZE_BYTES |
|
|
from ..core.state import ( |
|
|
generation_state_lock, |
|
|
get_stop_generation_requested, |
|
|
set_stop_generation_requested, |
|
|
is_audio_conversion_queue_busy, |
|
|
get_audio_conversion_waiting_count, |
|
|
acquire_generation_protection, |
|
|
release_generation_protection |
|
|
) |
|
|
from ..core.authentication import get_huggingface_token |
|
|
from ..core.memory import ( |
|
|
has_temporary_files_pending_cleanup, |
|
|
cleanup_expired_temporary_files, |
|
|
perform_memory_cleanup, |
|
|
memory_cleanup, |
|
|
trigger_background_cleanup_check |
|
|
) |
|
|
from ..tts.manager import text_to_speech_manager, ModelNotLoadedError, ModelLoadingError |
|
|
from ..validation.text import validate_text_input |
|
|
from ..audio.validator import ( |
|
|
perform_voice_clone_file_validation, |
|
|
get_format_display_name, |
|
|
format_file_size_for_display, |
|
|
validate_file_size_for_voice_cloning |
|
|
) |
|
|
from ..audio.converter import ( |
|
|
prepare_audio_file_for_voice_cloning, |
|
|
AudioConversionQueueBusyError, |
|
|
AudioConversionQueueTimeoutError |
|
|
) |
|
|
|
|
|
def check_if_generating(): |
|
|
from ..core.state import is_currently_generating |
|
|
with generation_state_lock: |
|
|
return is_currently_generating |
|
|
|
|
|
def request_generation_stop(): |
|
|
set_stop_generation_requested(True) |
|
|
return gr.update(interactive=False) |
|
|
|
|
|
def validate_voice_clone_file_size(voice_clone_audio_file): |
|
|
if not voice_clone_audio_file: |
|
|
return True, None |
|
|
|
|
|
file_size_valid, file_size_error = validate_file_size_for_voice_cloning(voice_clone_audio_file) |
|
|
|
|
|
if not file_size_valid: |
|
|
return False, file_size_error |
|
|
|
|
|
return True, None |
|
|
|
|
|
def validate_and_prepare_voice_clone_audio(voice_clone_audio_file): |
|
|
if not voice_clone_audio_file: |
|
|
return None, "Please upload an audio file for voice cloning.", None, None |
|
|
|
|
|
file_size_valid, file_size_error = validate_file_size_for_voice_cloning(voice_clone_audio_file) |
|
|
|
|
|
if not file_size_valid: |
|
|
return None, file_size_error, None, None |
|
|
|
|
|
is_valid, is_wav_format, detected_format, validation_error = perform_voice_clone_file_validation(voice_clone_audio_file) |
|
|
|
|
|
if not is_valid: |
|
|
format_display_name = get_format_display_name(detected_format) if detected_format else "Unknown" |
|
|
|
|
|
if validation_error: |
|
|
if "too short" in validation_error.lower(): |
|
|
return None, f"The uploaded audio file is too short. Please upload a longer audio sample for better voice cloning results.", None, detected_format |
|
|
|
|
|
if "too long" in validation_error.lower(): |
|
|
return None, f"The uploaded audio file is too long. Please upload a shorter audio sample (maximum 1 hour).", None, detected_format |
|
|
|
|
|
if "empty" in validation_error.lower() or "0 bytes" in validation_error.lower(): |
|
|
return None, "The uploaded audio file is empty. Please upload a valid audio file.", None, detected_format |
|
|
|
|
|
if "corrupted" in validation_error.lower() or "truncated" in validation_error.lower(): |
|
|
return None, f"The uploaded {format_display_name} file appears to be corrupted or incomplete. Please upload a valid audio file.", None, detected_format |
|
|
|
|
|
if "unsupported" in validation_error.lower(): |
|
|
return None, validation_error, None, detected_format |
|
|
|
|
|
if "exceeds" in validation_error.lower() or "maximum" in validation_error.lower(): |
|
|
return None, validation_error, None, detected_format |
|
|
|
|
|
return None, f"Invalid audio file: {validation_error}", None, detected_format |
|
|
|
|
|
return None, "The uploaded file could not be validated as a valid audio file.", None, detected_format |
|
|
|
|
|
format_display_name = get_format_display_name(detected_format) |
|
|
|
|
|
if is_audio_conversion_queue_busy(): |
|
|
waiting_count = get_audio_conversion_waiting_count() |
|
|
|
|
|
if waiting_count > 0: |
|
|
gr.Warning(f"Audio conversion queue is busy. Your request is queued (position: {waiting_count + 1}). Please wait...") |
|
|
|
|
|
else: |
|
|
gr.Warning("Audio conversion is in progress for another user. Your request has been queued. Please wait...") |
|
|
|
|
|
try: |
|
|
if is_wav_format: |
|
|
prepared_path, preparation_error, was_converted, final_format = prepare_audio_file_for_voice_cloning( |
|
|
voice_clone_audio_file, |
|
|
wait_for_queue=True |
|
|
) |
|
|
|
|
|
if prepared_path is None: |
|
|
return None, f"Failed to process WAV file: {preparation_error}", None, 'wav' |
|
|
|
|
|
return prepared_path, None, False, 'wav' |
|
|
|
|
|
else: |
|
|
prepared_path, preparation_error, was_converted, final_format = prepare_audio_file_for_voice_cloning( |
|
|
voice_clone_audio_file, |
|
|
wait_for_queue=True |
|
|
) |
|
|
|
|
|
if prepared_path is None: |
|
|
if "no audio conversion library" in preparation_error.lower(): |
|
|
return None, f"Cannot convert {format_display_name} format. Please upload a WAV file directly.", None, detected_format |
|
|
|
|
|
if "queue" in preparation_error.lower() or "busy" in preparation_error.lower(): |
|
|
return None, preparation_error, None, detected_format |
|
|
|
|
|
return None, f"Failed to convert {format_display_name} to WAV format: {preparation_error}", None, detected_format |
|
|
|
|
|
return prepared_path, None, True, detected_format |
|
|
|
|
|
except AudioConversionQueueBusyError as queue_busy_error: |
|
|
return None, str(queue_busy_error), None, detected_format |
|
|
|
|
|
except AudioConversionQueueTimeoutError as queue_timeout_error: |
|
|
return None, str(queue_timeout_error), None, detected_format |
|
|
|
|
|
def perform_speech_generation( |
|
|
text_input, |
|
|
voice_mode_selection, |
|
|
voice_preset_selection, |
|
|
voice_clone_audio_file, |
|
|
model_variant, |
|
|
lsd_decode_steps, |
|
|
temperature, |
|
|
noise_clamp, |
|
|
eos_threshold, |
|
|
frames_after_eos, |
|
|
enable_custom_frames |
|
|
): |
|
|
from ..core import state as global_state |
|
|
|
|
|
if has_temporary_files_pending_cleanup(): |
|
|
cleanup_expired_temporary_files() |
|
|
|
|
|
is_valid, validation_result = validate_text_input(text_input) |
|
|
|
|
|
if not is_valid: |
|
|
if validation_result: |
|
|
raise gr.Error(validation_result) |
|
|
raise gr.Error("Please enter valid text to generate speech.") |
|
|
|
|
|
prepared_audio_path = None |
|
|
was_audio_converted = False |
|
|
original_audio_format = None |
|
|
|
|
|
if voice_mode_selection == VOICE_MODE_CLONE: |
|
|
if not voice_clone_audio_file: |
|
|
raise gr.Error("Please upload an audio file for voice cloning.") |
|
|
|
|
|
file_size_valid, file_size_error = validate_voice_clone_file_size(voice_clone_audio_file) |
|
|
if not file_size_valid: |
|
|
max_size_display = format_file_size_for_display(MAXIMUM_VOICE_CLONE_FILE_SIZE_BYTES) |
|
|
raise gr.Error(f"File size exceeds maximum limit of {max_size_display}. {file_size_error}") |
|
|
|
|
|
if not get_huggingface_token(): |
|
|
raise gr.Error("Voice cloning is not configured properly at the moment. Please try again later.") |
|
|
|
|
|
prepared_audio_path, audio_error, was_audio_converted, original_audio_format = validate_and_prepare_voice_clone_audio(voice_clone_audio_file) |
|
|
|
|
|
if prepared_audio_path is None: |
|
|
raise gr.Error(audio_error) |
|
|
|
|
|
if was_audio_converted: |
|
|
format_display_name = get_format_display_name(original_audio_format) |
|
|
gr.Warning(f"Audio converted from {format_display_name} to WAV format for voice cloning.") |
|
|
|
|
|
with generation_state_lock: |
|
|
if global_state.is_currently_generating: |
|
|
raise gr.Error("A generation is already in progress. Please wait.") |
|
|
|
|
|
global_state.is_currently_generating = True |
|
|
global_state.stop_generation_requested = False |
|
|
|
|
|
acquire_generation_protection() |
|
|
|
|
|
generated_audio_tensor = None |
|
|
cloned_voice_state_tensor = None |
|
|
|
|
|
try: |
|
|
perform_memory_cleanup() |
|
|
|
|
|
loaded_model = text_to_speech_manager.load_or_get_model( |
|
|
model_variant, |
|
|
temperature, |
|
|
lsd_decode_steps, |
|
|
noise_clamp, |
|
|
eos_threshold |
|
|
) |
|
|
|
|
|
if loaded_model is None: |
|
|
raise gr.Error("Failed to load TTS model. Please try again.") |
|
|
|
|
|
with generation_state_lock: |
|
|
if global_state.stop_generation_requested: |
|
|
return None |
|
|
|
|
|
if voice_mode_selection == VOICE_MODE_CLONE: |
|
|
cloned_voice_state_tensor = text_to_speech_manager.get_voice_state_for_clone( |
|
|
voice_clone_audio_file, |
|
|
prepared_audio_path=prepared_audio_path |
|
|
) |
|
|
voice_state = cloned_voice_state_tensor |
|
|
|
|
|
else: |
|
|
voice_state = text_to_speech_manager.get_voice_state_for_preset(voice_preset_selection) |
|
|
|
|
|
with generation_state_lock: |
|
|
if global_state.stop_generation_requested: |
|
|
return None |
|
|
|
|
|
generated_audio_tensor = text_to_speech_manager.generate_audio( |
|
|
validation_result, |
|
|
voice_state, |
|
|
frames_after_eos, |
|
|
enable_custom_frames |
|
|
) |
|
|
|
|
|
with generation_state_lock: |
|
|
if global_state.stop_generation_requested: |
|
|
return None |
|
|
|
|
|
output_file_path = text_to_speech_manager.save_audio_to_file(generated_audio_tensor) |
|
|
|
|
|
return output_file_path |
|
|
|
|
|
except gr.Error: |
|
|
raise |
|
|
|
|
|
except ModelNotLoadedError as model_not_loaded_error: |
|
|
raise gr.Error(str(model_not_loaded_error)) |
|
|
|
|
|
except ModelLoadingError as model_loading_error: |
|
|
raise gr.Error(f"Failed to load TTS model: {str(model_loading_error)}") |
|
|
|
|
|
except RuntimeError as runtime_error: |
|
|
error_message = str(runtime_error) |
|
|
if "not loaded" in error_message.lower(): |
|
|
|
|
|
if text_to_speech_manager.ensure_model_loaded(): |
|
|
raise gr.Error("Model was temporarily unavailable. Please try again.") |
|
|
|
|
|
else: |
|
|
raise gr.Error("TTS model could not be loaded. Please try again later.") |
|
|
|
|
|
raise gr.Error(error_message) |
|
|
|
|
|
except Exception as generation_error: |
|
|
error_message = str(generation_error) |
|
|
|
|
|
if "file does not start with RIFF id" in error_message: |
|
|
raise gr.Error("The audio file format is not supported. Please upload a valid WAV file or a common audio format (MP3, FLAC, OGG, M4A).") |
|
|
|
|
|
if "unknown format" in error_message.lower(): |
|
|
raise gr.Error("The audio file uses an unsupported encoding format. Please convert it to a standard format and try again.") |
|
|
|
|
|
raise gr.Error(f"Speech generation failed: {error_message}") |
|
|
|
|
|
finally: |
|
|
release_generation_protection() |
|
|
|
|
|
with generation_state_lock: |
|
|
global_state.is_currently_generating = False |
|
|
global_state.stop_generation_requested = False |
|
|
|
|
|
if generated_audio_tensor is not None: |
|
|
try: |
|
|
del generated_audio_tensor |
|
|
|
|
|
except Exception: |
|
|
pass |
|
|
|
|
|
generated_audio_tensor = None |
|
|
|
|
|
if cloned_voice_state_tensor is not None: |
|
|
try: |
|
|
del cloned_voice_state_tensor |
|
|
|
|
|
except Exception: |
|
|
pass |
|
|
|
|
|
cloned_voice_state_tensor = None |
|
|
|
|
|
memory_cleanup() |
|
|
trigger_background_cleanup_check() |