File size: 5,123 Bytes
35bb6f4
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
from __future__ import annotations

import shutil
from pathlib import Path

import torch
from loguru import logger

from api.src.core.config import settings
from api.src.core.model_config import BUILTIN_VOICES
from api.src.core.paths import (
    BUILTIN_VOICES_DIR,
    CUSTOM_VOICES_DIR,
    ensure_voice_dirs,
    get_voice_codes,
    get_voice_text,
    get_voice_wav,
    is_custom_voice,
    voice_codes_path,
)


class VoiceManager:
    _instance: VoiceManager | None = None

    def __init__(self) -> None:
        self._voices: dict[str, dict] = {}

    @classmethod
    def get_instance(cls) -> VoiceManager:
        if cls._instance is None:
            cls._instance = cls()
        return cls._instance

    def scan_voices(self) -> None:
        ensure_voice_dirs()
        self._voices.clear()

        # Built-in voices
        for name, info in BUILTIN_VOICES.items():
            wav_exists = get_voice_wav(name) is not None
            txt_exists = get_voice_text(name) is not None
            self._voices[name] = {
                "name": name,
                "language": info["language"],
                "gender": info["gender"],
                "description": info["description"],
                "custom": False,
                "available": wav_exists and txt_exists,
            }

        # Custom voices: scan for .wav files
        for wav in CUSTOM_VOICES_DIR.glob("*.wav"):
            name = wav.stem
            if name not in self._voices:
                txt_exists = get_voice_text(name) is not None
                self._voices[name] = {
                    "name": name,
                    "language": "unknown",
                    "gender": "unknown",
                    "description": "Custom uploaded voice",
                    "custom": True,
                    "available": txt_exists,
                }

        available = sum(1 for v in self._voices.values() if v.get("available", True))
        logger.info(
            f"Scanned {len(self._voices)} voices ({len(BUILTIN_VOICES)} builtin, {available} available)"
        )

    @property
    def voices(self) -> dict[str, dict]:
        return self._voices

    def voice_exists(self, voice_name: str) -> bool:
        return voice_name in self._voices

    def get_ref_text(self, voice_name: str) -> str:
        txt_path = get_voice_text(voice_name)
        if txt_path is None:
            raise FileNotFoundError(f"No reference text found for voice '{voice_name}'")
        return txt_path.read_text(encoding="utf-8").strip()

    def get_ref_codes(self, voice_name: str, codec_id: str) -> torch.Tensor | None:
        codes_path = get_voice_codes(voice_name, codec_id)
        if codes_path is None:
            return None
        return torch.load(codes_path, map_location="cpu", weights_only=True)

    async def get_or_encode_ref_codes(
        self,
        voice_name: str,
        codec_id: str,
        model_manager: object,
        model_id: str,
    ) -> object:
        codes = self.get_ref_codes(voice_name, codec_id)
        if codes is not None:
            return codes

        wav_path = get_voice_wav(voice_name)
        if wav_path is None:
            raise FileNotFoundError(f"No WAV file found for voice '{voice_name}'")

        logger.info(f"Encoding reference for voice '{voice_name}' with codec '{codec_id}'")
        ref_codes = await model_manager.encode_reference(model_id, str(wav_path))

        # Cache the encoded reference
        custom = is_custom_voice(voice_name)
        save_path = voice_codes_path(voice_name, codec_id, custom=custom)
        torch.save(ref_codes, save_path)
        logger.info(f"Cached reference codes at {save_path}")

        return ref_codes

    def upload_voice(
        self,
        voice_name: str,
        wav_data: bytes,
        ref_text: str,
        language: str = "unknown",
        gender: str = "unknown",
    ) -> Path:
        ensure_voice_dirs()
        wav_path = CUSTOM_VOICES_DIR / f"{voice_name}.wav"
        txt_path = CUSTOM_VOICES_DIR / f"{voice_name}.txt"

        wav_path.write_bytes(wav_data)
        txt_path.write_text(ref_text, encoding="utf-8")

        self._voices[voice_name] = {
            "name": voice_name,
            "language": language,
            "gender": gender,
            "description": "Custom uploaded voice",
            "custom": True,
            "available": True,
        }

        logger.info(f"Uploaded custom voice '{voice_name}' (lang={language}, gender={gender})")
        return wav_path

    def delete_voice(self, voice_name: str) -> None:
        if voice_name in BUILTIN_VOICES:
            raise ValueError(f"Cannot delete built-in voice '{voice_name}'")

        if voice_name not in self._voices:
            raise ValueError(f"Voice '{voice_name}' not found")

        # Remove all files for this voice
        for pattern in (f"{voice_name}.wav", f"{voice_name}.txt", f"{voice_name}_*.pt"):
            for f in CUSTOM_VOICES_DIR.glob(pattern):
                f.unlink()

        self._voices.pop(voice_name, None)
        logger.info(f"Deleted custom voice '{voice_name}'")