File size: 2,734 Bytes
76374ce
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
# tts_handler.py

import edge_tts
import asyncio
import tempfile
import subprocess
import os

# Language default (environment variable)
DEFAULT_LANGUAGE = os.getenv('DEFAULT_LANGUAGE', 'en-US')

# OpenAI voice names mapped to edge-tts equivalents
voice_mapping = {
    'alloy': 'en-US-AvaNeural',
    'echo': 'en-US-AndrewNeural',
    'fable': 'en-GB-SoniaNeural',
    'onyx': 'en-US-EricNeural',
    'nova': 'en-US-SteffanNeural',
    'shimmer': 'en-US-EmmaNeural'
}

async def _generate_audio(text, voice, response_format, speed):
    # Determine if the voice is an OpenAI-compatible voice or a direct edge-tts voice
    edge_tts_voice = voice_mapping.get(voice, voice)  # Use mapping if in OpenAI names, otherwise use as-is

    # Generate the TTS output in mp3 format first
    temp_output_file = tempfile.NamedTemporaryFile(delete=False, suffix=".mp3")
    communicator = edge_tts.Communicate(text, edge_tts_voice)
    await communicator.save(temp_output_file.name)

    # If the requested format is mp3 and speed is 1.0, return the generated file directly
    if response_format == "mp3" and speed == 1.0:
        return temp_output_file.name

    # Convert to the requested format if not mp3 or if speed adjustment is needed
    converted_output_file = tempfile.NamedTemporaryFile(delete=False, suffix=f".{response_format}")
    
    # ffmpeg playback speed adjustment
    speed_filter = f"atempo={speed}" if response_format != "pcm" else f"asetrate=44100*{speed},aresample=44100"
    ffmpeg_command = [
        "ffmpeg", "-i", temp_output_file.name, 
        "-filter:a", speed_filter,  # Apply speed adjustment
        "-f", response_format, "-y",
        converted_output_file.name
    ]

    try:
        subprocess.run(ffmpeg_command, check=True)
    except subprocess.CalledProcessError as e:
        raise RuntimeError(f"Error in audio conversion: {e}")

    return converted_output_file.name

def generate_speech(text, voice, response_format, speed=1.0):
    return asyncio.run(_generate_audio(text, voice, response_format, speed))

def get_models():
    return [
        {"id": "tts-1", "name": "Text-to-speech v1"},
        {"id": "tts-1-hd", "name": "Text-to-speech v1 HD"}
    ]

async def _get_voices(language=None):
    # List all voices, filter by language if specified
    all_voices = await edge_tts.list_voices()
    language = language or DEFAULT_LANGUAGE  # Use default if no language specified
    filtered_voices = [
        {"name": v['ShortName'], "gender": v['Gender'], "language": v['Locale']}
        for v in all_voices if language == 'all' or language is None or v['Locale'] == language
    ]
    return filtered_voices

def get_voices(language=None):
    return asyncio.run(_get_voices(language))