File size: 12,806 Bytes
aacd162
 
 
 
 
74be2eb
aacd162
 
 
 
 
 
 
 
ea7dc28
 
 
 
 
 
 
 
 
 
 
 
 
 
aacd162
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
74be2eb
 
aacd162
 
74be2eb
62f0a86
74be2eb
 
 
 
ea7dc28
 
 
74be2eb
ea7dc28
 
 
 
 
 
 
 
 
 
 
 
 
74be2eb
 
ea7dc28
74be2eb
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
62f0a86
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
74be2eb
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
aacd162
 
 
 
 
74be2eb
 
 
 
 
 
 
 
 
 
 
 
 
 
62f0a86
74be2eb
 
 
 
 
 
 
 
 
 
aacd162
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
74be2eb
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
"""
Text-to-Speech adapter supporting multiple providers.
"""
import os
from pathlib import Path
from typing import Any, Literal, Optional
from abc import ABC, abstractmethod
from dotenv import load_dotenv

load_dotenv()

# TTS Provider type
TTSProvider = Literal["openai", "elevenlabs", "edge"]

# Common ElevenLabs preset voice name -> voice_id mapping.
# This allows env values like "Rachel"/"Antoni" to work with SDK methods that require voice_id.
ELEVENLABS_PRESET_VOICE_IDS: dict[str, str] = {
    "rachel": "21m00Tcm4TlvDq8ikWAM",
    "domi": "AZnzlk1XvdvUeBnXmlld",
    "bella": "EXAVITQu4vr4xnSDxMaL",
    "antoni": "ErXwobaYiN019PkySvjV",
    "elli": "MF3mGyEYCl7XYWbV9V6O",
    "josh": "TxGEqnHWrfWFTfGW9XjX",
    "arnold": "VR6AewLTigWG4xSOukaG",
    "adam": "pNInz6obpgDQGcFmaJgB",
    "sam": "yoZ06aMxZJJ28mfd3POQ",
}


class TTSAdapter(ABC):
    """Base class for TTS providers."""
    
    @abstractmethod
    def synthesize(self, text: str, output_path: str, voice: Optional[str] = None) -> str:
        """
        Convert text to speech.
        
        Args:
            text: Text to synthesize
            output_path: Where to save audio file
            voice: Voice identifier (provider-specific)
        
        Returns:
            Path to generated audio file
        """
        pass


class OpenAITTS(TTSAdapter):
    """OpenAI TTS (good quality, moderate cost)."""
    
    def __init__(self, api_key: Optional[str] = None, model: Optional[str] = None):
        import openai
        self.api_key = api_key or os.getenv("OPENAI_API_KEY")
        self.model = model or os.getenv("TTS_MODEL", "tts-1")
        self.client = openai.OpenAI(api_key=self.api_key)
        self.default_voice = os.getenv("TTS_OPENAI_VOICE_1", "alloy")
    
    def synthesize(self, text: str, output_path: str, voice: Optional[str] = None) -> str:
        """
        Voices: alloy, echo, fable, onyx, nova, shimmer
        """
        voice = voice or self.default_voice
        
        response = self.client.audio.speech.create(
            model=self.model,
            voice=voice,
            input=text
        )

        with open(output_path, "wb") as f:
            f.write(response.content)
        return output_path


class ElevenLabsTTS(TTSAdapter):
    """ElevenLabs TTS (highest quality, paid)."""
    
    def __init__(self, api_key: Optional[str] = None):
        from elevenlabs.client import ElevenLabs
        
        self.api_key = api_key or os.getenv("ELEVENLABS_API_KEY")
        if not self.api_key:
            raise ValueError("ELEVENLABS_API_KEY environment variable not set")
        self.client = ElevenLabs(api_key=self.api_key)
        self.default_voice = os.getenv("TTS_ELEVENLABS_VOICE_1", "Rachel")
        self.default_model = os.getenv("TTS_ELEVENLABS_MODEL", "eleven_multilingual_v2")
        self.output_format = os.getenv("TTS_ELEVENLABS_OUTPUT_FORMAT", "mp3_44100_128")
        self._voice_aliases = self._load_voice_aliases()

    def _load_voice_aliases(self) -> dict[str, str]:
        """Best-effort map of configured voice names to voice IDs."""
        aliases: dict[str, str] = dict(ELEVENLABS_PRESET_VOICE_IDS)

        # First try the latest SDK shape.
        try:
            voices_api = getattr(self.client, "voices", None)
            if voices_api is None:
                return aliases

            if hasattr(voices_api, "get_all"):
                response = voices_api.get_all()
            elif hasattr(voices_api, "search"):
                response = voices_api.search()
            elif hasattr(voices_api, "list"):
                response = voices_api.list()
            else:
                return aliases

            voices = getattr(response, "voices", response)
        except Exception:
            return aliases

        for voice in voices or []:
            if isinstance(voice, dict):
                name = voice.get("name")
                voice_id = voice.get("voice_id")
            else:
                name = getattr(voice, "name", None)
                voice_id = getattr(voice, "voice_id", None)
            if name and voice_id:
                aliases[str(name).strip().lower()] = str(voice_id).strip()
        return aliases

    def _resolve_voice(self, voice: str) -> str:
        candidate = str(voice or "").strip()
        if not candidate:
            candidate = self.default_voice
        return self._voice_aliases.get(candidate.lower(), candidate)

    def _try_call_variants(self, method: Any, variants: list[dict[str, Any]]) -> Any:
        """
        Some ElevenLabs SDK versions use different parameter names.
        Try a small set of known-compatible signatures.
        """
        last_type_error: TypeError | None = None
        for kwargs in variants:
            try:
                return method(**kwargs)
            except TypeError as exc:
                last_type_error = exc
                continue
        if last_type_error:
            raise last_type_error
        raise RuntimeError("Unable to call ElevenLabs SDK method with known signatures.")

    def _request_audio(self, text: str, voice_candidate: str, model_candidate: str) -> Any:
        """
        Support both legacy and modern ElevenLabs Python SDK APIs:
        - legacy: client.generate(...)
        - modern: client.text_to_speech.convert(...)/convert_as_stream(...)
        """
        if hasattr(self.client, "generate"):
            return self._try_call_variants(
                self.client.generate,
                [
                    {"text": text, "voice": voice_candidate, "model": model_candidate},
                    {"text": text, "voice": voice_candidate, "model_id": model_candidate},
                    {"text": text, "voice_id": voice_candidate, "model_id": model_candidate},
                ],
            )

        tts_api = getattr(self.client, "text_to_speech", None)
        if tts_api is not None:
            if hasattr(tts_api, "convert_as_stream"):
                return self._try_call_variants(
                    tts_api.convert_as_stream,
                    [
                        {
                            "text": text,
                            "voice_id": voice_candidate,
                            "model_id": model_candidate,
                            "output_format": self.output_format,
                        },
                        {
                            "text": text,
                            "voice_id": voice_candidate,
                            "model_id": model_candidate,
                        },
                        {"text": text, "voice": voice_candidate, "model": model_candidate},
                    ],
                )

            if hasattr(tts_api, "convert"):
                return self._try_call_variants(
                    tts_api.convert,
                    [
                        {
                            "text": text,
                            "voice_id": voice_candidate,
                            "model_id": model_candidate,
                            "output_format": self.output_format,
                        },
                        {
                            "text": text,
                            "voice_id": voice_candidate,
                            "model_id": model_candidate,
                        },
                        {"text": text, "voice": voice_candidate, "model": model_candidate},
                    ],
                )

        raise AttributeError(
            "No compatible ElevenLabs synthesis method found on client "
            "(expected generate() or text_to_speech.convert())."
        )

    def _write_audio_output(self, audio: Any, output_path: str) -> None:
        """
        ElevenLabs SDK returns either bytes, file-like, or iterable chunks depending
        on version/options. Handle all supported shapes safely.
        """
        Path(output_path).parent.mkdir(parents=True, exist_ok=True)
        with open(output_path, "wb") as f:
            if isinstance(audio, (bytes, bytearray)):
                f.write(bytes(audio))
                return

            if hasattr(audio, "read"):
                data = audio.read()
                if isinstance(data, str):
                    data = data.encode("utf-8")
                if not isinstance(data, (bytes, bytearray)):
                    raise TypeError("ElevenLabs returned unsupported file-like payload.")
                f.write(bytes(data))
                return

            wrote_any = False
            for chunk in audio:
                if chunk is None:
                    continue
                wrote_any = True
                if isinstance(chunk, int):
                    f.write(bytes([chunk]))
                elif isinstance(chunk, str):
                    f.write(chunk.encode("utf-8"))
                elif isinstance(chunk, (bytes, bytearray)):
                    f.write(bytes(chunk))
                else:
                    raise TypeError(f"Unsupported ElevenLabs audio chunk type: {type(chunk)!r}")

            if not wrote_any:
                raise RuntimeError("ElevenLabs returned an empty audio stream.")
    
    def synthesize(self, text: str, output_path: str, voice: Optional[str] = None) -> str:
        """
        Popular voices: Rachel, Domi, Bella, Antoni, Elli, Josh, Arnold, Adam, Sam
        """
        requested_voice = voice or self.default_voice
        resolved_voice = self._resolve_voice(requested_voice)
        voice_candidates = [resolved_voice]
        if requested_voice != resolved_voice:
            voice_candidates.append(requested_voice)

        model_candidates = [self.default_model]
        if self.default_model != "eleven_multilingual_v2":
            model_candidates.append("eleven_multilingual_v2")

        errors: list[str] = []
        for voice_candidate in voice_candidates:
            for model_candidate in model_candidates:
                try:
                    audio = self._request_audio(text, voice_candidate, model_candidate)
                    self._write_audio_output(audio, output_path)
                    return output_path
                except Exception as exc:
                    errors.append(
                        f"voice={voice_candidate}, model={model_candidate}: "
                        f"{type(exc).__name__}: {exc}"
                    )

        preview = " | ".join(errors[:3]) if errors else "unknown ElevenLabs error"
        raise RuntimeError(f"ElevenLabs synthesis failed. {preview}")


class EdgeTTS(TTSAdapter):
    """Microsoft Edge TTS (free, good quality)."""
    
    def __init__(self):
        self.default_voice = os.getenv("TTS_EDGE_VOICE_1", "en-US-GuyNeural")
    
    def synthesize(self, text: str, output_path: str, voice: Optional[str] = None) -> str:
        """
        Popular voices:
        - en-US-AriaNeural (female)
        - en-US-GuyNeural (male)
        - en-GB-SoniaNeural (British female)
        - en-GB-RyanNeural (British male)
        """
        import edge_tts
        import asyncio
        
        voice = voice or self.default_voice
        
        async def _synthesize():
            communicate = edge_tts.Communicate(text, voice)
            await communicate.save(output_path)
        
        asyncio.run(_synthesize())
        return output_path


def get_tts_adapter(provider: Optional[TTSProvider] = None) -> TTSAdapter:
    """
    Factory function to get TTS adapter.
    
    Args:
        provider: TTS provider to use (defaults to TTS_PROVIDER from .env)
    
    Returns:
        Configured TTS adapter instance
    """
    provider = provider or os.getenv("TTS_PROVIDER", "edge")
    
    adapters = {
        "openai": OpenAITTS,
        "elevenlabs": ElevenLabsTTS,
        "edge": EdgeTTS,
    }
    
    if provider not in adapters:
        raise ValueError(f"Unknown TTS provider: {provider}. Choose from: {list(adapters.keys())}")
    
    return adapters[provider]()


# === CLI for testing ===
if __name__ == "__main__":
    import argparse
    
    parser = argparse.ArgumentParser(description="Test TTS")
    parser.add_argument("--provider", choices=["openai", "elevenlabs", "edge"], help="TTS provider")
    parser.add_argument("--text", default="Hello, this is a test of text to speech synthesis.")
    parser.add_argument("--output", default="test_audio.mp3")
    parser.add_argument("--voice", help="Voice ID (provider-specific)")
    
    args = parser.parse_args()
    
    tts = get_tts_adapter(args.provider)
    output_file = tts.synthesize(args.text, args.output, args.voice)
    
    print(f"✓ Audio generated: {output_file}")
    print(f"  Provider: {args.provider or os.getenv('TTS_PROVIDER', 'edge')}")
    print(f"  Voice: {args.voice or 'default'}")