|
|
"""Models module for Kokoro TTS Local""" |
|
|
from typing import Optional, Tuple, List |
|
|
import torch |
|
|
from kokoro import KPipeline |
|
|
import os |
|
|
import json |
|
|
import codecs |
|
|
from pathlib import Path |
|
|
import numpy as np |
|
|
import shutil |
|
|
|
|
|
|
|
|
os.environ["PYTHONIOENCODING"] = "utf-8" |
|
|
|
|
|
os.environ["HF_HUB_DISABLE_SYMLINKS_WARNING"] = "1" |
|
|
|
|
|
|
|
|
VOICE_FILES = [ |
|
|
|
|
|
"af_alloy.pt", "af_aoede.pt", "af_bella.pt", "af_jessica.pt", |
|
|
"af_kore.pt", "af_nicole.pt", "af_nova.pt", "af_river.pt", |
|
|
"af_sarah.pt", "af_sky.pt", |
|
|
|
|
|
"am_adam.pt", "am_echo.pt", "am_eric.pt", "am_fenrir.pt", |
|
|
"am_liam.pt", "am_michael.pt", "am_onyx.pt", "am_puck.pt", |
|
|
"am_santa.pt", |
|
|
|
|
|
"bf_alice.pt", "bf_emma.pt", "bf_isabella.pt", "bf_lily.pt", |
|
|
|
|
|
"bm_daniel.pt", "bm_fable.pt", "bm_george.pt", "bm_lewis.pt", |
|
|
|
|
|
"el_dora.pt", "em_alex.pt", "em_santa.pt", |
|
|
"ff_siwis.pt", |
|
|
"hf_alpha.pt", "hf_beta.pt", |
|
|
"hm_omega.pt", "hm_psi.pt", |
|
|
"jf_sara.pt", "jm_nicola.pt", |
|
|
"jf_alpha.pt", "jf_gongtsuene.pt", "jf_nezumi.pt", "jf_tebukuro.pt", |
|
|
"jm_kumo.pt", |
|
|
"pf_dora.pt", "pm_alex.pt", "pm_santa.pt", |
|
|
"zf_xiaobei.pt", "zf_xiaoni.pt", "zf_xiaoqiao.pt", "zf_xiaoyi.pt" |
|
|
] |
|
|
|
|
|
|
|
|
original_load_voice = KPipeline.load_voice |
|
|
|
|
|
def patched_load_voice(self, voice_path): |
|
|
"""Load voice model with weights_only=False for compatibility""" |
|
|
if not os.path.exists(voice_path): |
|
|
raise FileNotFoundError(f"Voice file not found: {voice_path}") |
|
|
voice_name = Path(voice_path).stem |
|
|
voice_model = torch.load(voice_path, weights_only=False) |
|
|
if voice_model is None: |
|
|
raise ValueError(f"Failed to load voice model from {voice_path}") |
|
|
|
|
|
if not hasattr(self, 'device'): |
|
|
self.device = 'cpu' |
|
|
|
|
|
self.voices[voice_name] = voice_model.to(self.device) |
|
|
return self.voices[voice_name] |
|
|
|
|
|
KPipeline.load_voice = patched_load_voice |
|
|
|
|
|
def patch_json_load(): |
|
|
"""Patch json.load to handle UTF-8 encoded files with special characters""" |
|
|
original_load = json.load |
|
|
|
|
|
def custom_load(fp, *args, **kwargs): |
|
|
try: |
|
|
|
|
|
if hasattr(fp, 'buffer'): |
|
|
content = fp.buffer.read().decode('utf-8') |
|
|
else: |
|
|
content = fp.read() |
|
|
return json.loads(content) |
|
|
except UnicodeDecodeError: |
|
|
|
|
|
fp.seek(0) |
|
|
content = fp.read() |
|
|
if isinstance(content, bytes): |
|
|
content = content.decode('utf-8-sig', errors='replace') |
|
|
return json.loads(content) |
|
|
|
|
|
json.load = custom_load |
|
|
|
|
|
def load_config(config_path: str) -> dict: |
|
|
"""Load configuration file with proper encoding handling""" |
|
|
try: |
|
|
with codecs.open(config_path, 'r', encoding='utf-8') as f: |
|
|
return json.load(f) |
|
|
except UnicodeDecodeError: |
|
|
|
|
|
with codecs.open(config_path, 'r', encoding='utf-8-sig') as f: |
|
|
return json.load(f) |
|
|
|
|
|
|
|
|
phonemizer_available = False |
|
|
try: |
|
|
from phonemizer.backend.espeak.wrapper import EspeakWrapper |
|
|
from phonemizer import phonemize |
|
|
import espeakng_loader |
|
|
|
|
|
|
|
|
library_path = espeakng_loader.get_library_path() |
|
|
data_path = espeakng_loader.get_data_path() |
|
|
espeakng_loader.make_library_available() |
|
|
|
|
|
|
|
|
EspeakWrapper.library_path = library_path |
|
|
EspeakWrapper.data_path = data_path |
|
|
|
|
|
|
|
|
try: |
|
|
test_phonemes = phonemize('test', language='en-us') |
|
|
if test_phonemes: |
|
|
phonemizer_available = True |
|
|
print("Phonemizer successfully initialized") |
|
|
else: |
|
|
print("Note: Phonemization returned empty result") |
|
|
print("TTS will work, but phoneme visualization will be disabled") |
|
|
except Exception as e: |
|
|
|
|
|
print(f"Note: Phonemizer not available: {e}") |
|
|
print("TTS will work, but phoneme visualization will be disabled") |
|
|
|
|
|
except ImportError as e: |
|
|
print(f"Installing required phonemizer packages...") |
|
|
import subprocess |
|
|
try: |
|
|
subprocess.check_call(["pip", "install", "espeakng-loader", "phonemizer-fork"]) |
|
|
|
|
|
|
|
|
from phonemizer.backend.espeak.wrapper import EspeakWrapper |
|
|
from phonemizer import phonemize |
|
|
import espeakng_loader |
|
|
|
|
|
library_path = espeakng_loader.get_library_path() |
|
|
data_path = espeakng_loader.get_data_path() |
|
|
espeakng_loader.make_library_available() |
|
|
EspeakWrapper.library_path = library_path |
|
|
EspeakWrapper.data_path = data_path |
|
|
|
|
|
|
|
|
try: |
|
|
test_phonemes = phonemize('test', language='en-us') |
|
|
if test_phonemes: |
|
|
phonemizer_available = True |
|
|
print("Phonemizer successfully initialized") |
|
|
else: |
|
|
print("Note: Phonemization returned empty result") |
|
|
print("TTS will work, but phoneme visualization will be disabled") |
|
|
except Exception as e: |
|
|
print(f"Note: Phonemizer still not functional: {e}") |
|
|
print("TTS will work, but phoneme visualization will be disabled") |
|
|
except Exception as e: |
|
|
print(f"Note: Could not install or initialize phonemizer: {e}") |
|
|
print("TTS will work, but phoneme visualization will be disabled") |
|
|
|
|
|
|
|
|
_pipeline = None |
|
|
|
|
|
def download_voice_files(): |
|
|
"""Download voice files from Hugging Face.""" |
|
|
voices_dir = Path("voices") |
|
|
voices_dir.mkdir(exist_ok=True) |
|
|
|
|
|
from huggingface_hub import hf_hub_download |
|
|
downloaded_voices = [] |
|
|
|
|
|
print("\nDownloading voice files...") |
|
|
for voice_file in VOICE_FILES: |
|
|
try: |
|
|
|
|
|
voice_path = voices_dir / voice_file |
|
|
|
|
|
if not voice_path.exists(): |
|
|
print(f"Downloading {voice_file}...") |
|
|
|
|
|
temp_path = hf_hub_download( |
|
|
repo_id="hexgrad/Kokoro-82M", |
|
|
filename=f"voices/{voice_file}", |
|
|
local_dir="temp_voices", |
|
|
force_download=True |
|
|
) |
|
|
|
|
|
|
|
|
os.makedirs(os.path.dirname(voice_path), exist_ok=True) |
|
|
shutil.move(temp_path, voice_path) |
|
|
downloaded_voices.append(voice_file) |
|
|
print(f"Successfully downloaded {voice_file}") |
|
|
else: |
|
|
print(f"Voice file {voice_file} already exists") |
|
|
downloaded_voices.append(voice_file) |
|
|
except Exception as e: |
|
|
print(f"Warning: Failed to download {voice_file}: {e}") |
|
|
continue |
|
|
|
|
|
|
|
|
if os.path.exists("temp_voices"): |
|
|
shutil.rmtree("temp_voices") |
|
|
|
|
|
if not downloaded_voices: |
|
|
print("Warning: No voice files could be downloaded. Please check your internet connection.") |
|
|
else: |
|
|
print(f"Successfully processed {len(downloaded_voices)} voice files") |
|
|
|
|
|
return downloaded_voices |
|
|
|
|
|
def build_model(model_path: str, device: str) -> KPipeline: |
|
|
"""Build and return the Kokoro pipeline with proper encoding configuration""" |
|
|
global _pipeline |
|
|
if _pipeline is None: |
|
|
try: |
|
|
|
|
|
patch_json_load() |
|
|
|
|
|
|
|
|
if model_path is None: |
|
|
model_path = 'kokoro-v1_0.pth' |
|
|
|
|
|
if not os.path.exists(model_path): |
|
|
print(f"Downloading model file {model_path}...") |
|
|
from huggingface_hub import hf_hub_download |
|
|
model_path = hf_hub_download( |
|
|
repo_id="hexgrad/Kokoro-82M", |
|
|
filename="kokoro-v1_0.pth", |
|
|
local_dir=".", |
|
|
force_download=True |
|
|
) |
|
|
print(f"Model downloaded to {model_path}") |
|
|
|
|
|
|
|
|
config_path = "config.json" |
|
|
if not os.path.exists(config_path): |
|
|
print("Downloading config file...") |
|
|
config_path = hf_hub_download( |
|
|
repo_id="hexgrad/Kokoro-82M", |
|
|
filename="config.json", |
|
|
local_dir=".", |
|
|
force_download=True |
|
|
) |
|
|
print(f"Config downloaded to {config_path}") |
|
|
|
|
|
|
|
|
downloaded_voices = download_voice_files() |
|
|
|
|
|
if not downloaded_voices: |
|
|
print("Error: No voice files available. Cannot proceed.") |
|
|
raise ValueError("No voice files available") |
|
|
|
|
|
|
|
|
_pipeline = KPipeline(lang_code='a') |
|
|
if _pipeline is None: |
|
|
raise ValueError("Failed to initialize KPipeline - pipeline is None") |
|
|
|
|
|
|
|
|
_pipeline.device = device |
|
|
|
|
|
|
|
|
if not hasattr(_pipeline, 'voices'): |
|
|
_pipeline.voices = {} |
|
|
|
|
|
|
|
|
for voice_file in downloaded_voices: |
|
|
voice_path = f"voices/{voice_file}" |
|
|
if os.path.exists(voice_path): |
|
|
try: |
|
|
_pipeline.load_voice(voice_path) |
|
|
print(f"Successfully loaded voice: {voice_file}") |
|
|
break |
|
|
except Exception as e: |
|
|
print(f"Warning: Failed to load voice {voice_file}: {e}") |
|
|
continue |
|
|
|
|
|
except Exception as e: |
|
|
print(f"Error initializing pipeline: {e}") |
|
|
raise |
|
|
return _pipeline |
|
|
|
|
|
def list_available_voices() -> List[str]: |
|
|
"""List all available voice models""" |
|
|
voices_dir = Path("voices") |
|
|
|
|
|
|
|
|
if not voices_dir.exists(): |
|
|
print(f"Creating voices directory at {voices_dir.absolute()}") |
|
|
voices_dir.mkdir(exist_ok=True) |
|
|
return [] |
|
|
|
|
|
|
|
|
voice_files = list(voices_dir.glob("*.pt")) |
|
|
|
|
|
|
|
|
if not voice_files: |
|
|
print(f"No voice files found in {voices_dir.absolute()}") |
|
|
|
|
|
root_voices = list(Path(".").glob("voices/*.pt")) |
|
|
if root_voices: |
|
|
print("Found voice files in root voices directory, moving them...") |
|
|
for voice_file in root_voices: |
|
|
target_path = voices_dir / voice_file.name |
|
|
if not target_path.exists(): |
|
|
shutil.move(str(voice_file), str(target_path)) |
|
|
|
|
|
voice_files = list(voices_dir.glob("*.pt")) |
|
|
|
|
|
if not voice_files: |
|
|
print("No voice files found. Please run the application again to download voices.") |
|
|
return [] |
|
|
|
|
|
return [f.stem for f in voice_files] |
|
|
|
|
|
def load_voice(voice_name: str, device: str) -> torch.Tensor: |
|
|
"""Load a voice model""" |
|
|
pipeline = build_model(None, device) |
|
|
|
|
|
voice_name = voice_name.replace('.pt', '') |
|
|
voice_path = f"voices/{voice_name}.pt" |
|
|
if not os.path.exists(voice_path): |
|
|
raise ValueError(f"Voice file not found: {voice_path}") |
|
|
return pipeline.load_voice(voice_path) |
|
|
|
|
|
def generate_speech( |
|
|
model: KPipeline, |
|
|
text: str, |
|
|
voice: str, |
|
|
lang: str = 'a', |
|
|
device: str = 'cpu', |
|
|
speed: float = 1.0 |
|
|
) -> Tuple[Optional[torch.Tensor], Optional[str]]: |
|
|
"""Generate speech using the Kokoro pipeline |
|
|
|
|
|
Args: |
|
|
model: KPipeline instance |
|
|
text: Text to synthesize |
|
|
voice: Voice name (e.g. 'af_bella') |
|
|
lang: Language code ('a' for American English, 'b' for British English) |
|
|
device: Device to use ('cuda' or 'cpu') |
|
|
speed: Speech speed multiplier (default: 1.0) |
|
|
|
|
|
Returns: |
|
|
Tuple of (audio tensor, phonemes string) or (None, None) on error |
|
|
""" |
|
|
try: |
|
|
if model is None: |
|
|
raise ValueError("Model is None - pipeline not properly initialized") |
|
|
|
|
|
|
|
|
if not hasattr(model, 'voices'): |
|
|
model.voices = {} |
|
|
|
|
|
|
|
|
if not hasattr(model, 'device'): |
|
|
model.device = device |
|
|
|
|
|
|
|
|
voice_name = voice.replace('.pt', '') |
|
|
voice_path = f"voices/{voice_name}.pt" |
|
|
if not os.path.exists(voice_path): |
|
|
raise ValueError(f"Voice file not found: {voice_path}") |
|
|
|
|
|
|
|
|
if voice_name not in model.voices: |
|
|
print(f"Loading voice {voice_name}...") |
|
|
model.load_voice(voice_path) |
|
|
|
|
|
if voice_name not in model.voices: |
|
|
raise ValueError(f"Failed to load voice {voice_name}") |
|
|
|
|
|
|
|
|
print(f"Generating speech with device: {model.device}") |
|
|
generator = model( |
|
|
text, |
|
|
voice=voice_path, |
|
|
speed=speed, |
|
|
split_pattern=r'\n+' |
|
|
) |
|
|
|
|
|
|
|
|
for gs, ps, audio in generator: |
|
|
if audio is not None: |
|
|
if isinstance(audio, np.ndarray): |
|
|
audio = torch.from_numpy(audio).float() |
|
|
return audio, ps |
|
|
|
|
|
return None, None |
|
|
except Exception as e: |
|
|
print(f"Error generating speech: {e}") |
|
|
return None, None |