File size: 7,653 Bytes
cef5f3b
 
 
 
7b78e6d
cef5f3b
 
 
 
 
 
 
 
 
 
 
7b78e6d
 
e5362b5
7b78e6d
e5362b5
7b78e6d
 
 
 
 
cef5f3b
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
7b78e6d
 
cef5f3b
 
 
7b78e6d
 
 
 
 
cef5f3b
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
e5362b5
cef5f3b
 
 
 
 
e5362b5
 
cef5f3b
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
import os
import base64
import tempfile
import requests
import subprocess
from pathlib import Path
from mistralai.client import Mistral

# ─── Client ───────────────────────────────────────────────────────────────────
def get_client():
    api_key = os.environ.get("MISTRAL_API_KEY")
    if not api_key:
        raise Exception("MISTRAL_API_KEY secret is not set. Please set it as an environment variable.")
    return Mistral(api_key=api_key)

# ─── Utility ──────────────────────────────────────────────────────────────────
def trim_audio_if_needed(audio_path, max_seconds=25):
    """Trims audio to max_seconds using ffmpeg."""
    out_path = tempfile.mktemp(suffix=".wav")
    try:
        subprocess.run(["ffmpeg", "-y", "-i", audio_path, "-t", str(max_seconds), "-c:a", "pcm_s16le", out_path], check=True)
        return out_path
    except Exception as e:
        print(f"Warning: Failed to trim audio, returning original: {e}")
        return audio_path

def list_user_voices():
    try:
        client = get_client()
        result = client.audio.voices.list(limit=100, offset=0)
        if result.total == 0:
            return "No voices found in your account."
        out = f"**Total Voices:** {result.total}\n\n"
        for voice in result.items:
            out += f"- **{voice.name}**\n  - ID: `{voice.id}`\n  - Languages: {', '.join(voice.languages) if hasattr(voice, 'languages') else 'unknown'}\n"
        return out
    except Exception as e:
        return f"Error fetching voices: {str(e)}"

def get_voice_choices():
    try:
        client = get_client()
        res = client.audio.voices.list(limit=100, offset=0)
        # Filter for Official Mistral Voices (Paul, Oliver, Jane, Marie) so we hide randomly cloned user voices
        official_names = ("Paul", "Oliver", "Jane", "Marie")
        official = []
        for v in res.items:
            if v.name.startswith(official_names) and " - " in v.name:
                official.append((f"{v.name}", v.id))
        return official
    except:
        return []


# ─── STT ──────────────────────────────────────────────────────────────────────
def transcribe_audio(audio_path, language):
    """Convert audio file β†’ text using Voxtral Mini Transcribe."""
    if audio_path is None:
        raise ValueError("Please provide an audio file path.")
    
    client = get_client()
    lang_param = language if language != "Auto-detect" else None
    with open(audio_path, "rb") as f:
        kwargs = dict(
            model="voxtral-mini-latest",
            file={"content": f, "file_name": Path(audio_path).name},
        )
        if lang_param:
            kwargs["language"] = lang_param
        response = client.audio.transcriptions.complete(**kwargs)
    return response.text


# ─── TTS ──────────────────────────────────────────────────────────────────────
def synthesize_speech(text, voice_id=None, ref_audio_path=None, audio_format="mp3"):
    """Convert text β†’ speech using Voxtral Mini TTS."""
    if not text.strip():
        raise ValueError("Please enter some text.")
        
    client = get_client()
    
    kwargs = dict(
        model="voxtral-mini-tts-2603",
        input=text,
        response_format=audio_format,
    )
    if voice_id:
        kwargs["voice_id"] = voice_id
    
    # Add Reference Audio for Zero-shot tone/voice cloning
    if ref_audio_path:
        clipped_ref_path = trim_audio_if_needed(ref_audio_path, max_seconds=25)
        with open(clipped_ref_path, "rb") as f:
            ref_audio_b64 = base64.b64encode(f.read()).decode("utf-8")
        kwargs["ref_audio"] = ref_audio_b64
        
        # Cleanup
        if clipped_ref_path != ref_audio_path and os.path.exists(clipped_ref_path):
            try: os.remove(clipped_ref_path)
            except: pass
        
    if not voice_id and not ref_audio_path:
        raise ValueError("Mistral API requires a voice! Please provide either a reference audio or a valid Voice ID.")

    response = client.audio.speech.complete(**kwargs)
    audio_bytes = base64.b64decode(response.audio_data)

    # Write to temp file
    suffix = f".{audio_format}"
    tmp = tempfile.NamedTemporaryFile(delete=False, suffix=suffix)
    tmp.write(audio_bytes)
    tmp.close()
    return tmp.name, len(audio_bytes)


# ─── Voice Cloning ────────────────────────────────────────────────────────────
def clone_voice(audio_path, url_input, voice_name, gender, languages_str):
    """Upload a sample audio or provide a URL to create a reusable cloned voice."""
    if not audio_path and not url_input.strip():
        raise ValueError("Please upload an audio clip or provide a media URL.")
    if not voice_name.strip():
        raise KeyError("Please enter a name for the voice.")
        
    final_audio_path = audio_path
    
    # If URL is provided, handle direct links or yt-dlp
    if url_input.strip():
        url = url_input.strip()
        base_out = tempfile.mktemp()
        
        # If it's a direct audio file link, bypass yt-dlp and download it directly
        if url.lower().endswith(('.mp3', '.wav', '.flac', '.ogg', '.m4a')):
            ext = url.split('.')[-1]
            final_audio_path = f"{base_out}.{ext}"
            with requests.get(url, stream=True, timeout=15) as r:
                r.raise_for_status()
                with open(final_audio_path, 'wb') as f:
                    for chunk in r.iter_content(chunk_size=8192):
                        f.write(chunk)
        # Otherwise use yt-dlp
        else:
            import yt_dlp
            ydl_opts = {
                'format': 'bestaudio/best',
                'outtmpl': base_out + '.%(ext)s',
                'quiet': True,
                'postprocessors': [{
                    'key': 'FFmpegExtractAudio',
                    'preferredcodec': 'mp3',
                    'preferredquality': '128',
                }],
                'postprocessor_args': [
                    '-t', '25' # Hard Limit to 25 seconds to bypass API 30s limit
                ],
            }
            with yt_dlp.YoutubeDL(ydl_opts) as ydl:
                info = ydl.extract_info(url, download=True)
                final_audio_path = base_out + '.mp3'
    # Ensure any direct MP3 or uploaded file is ALSO strictly trimmed
    final_audio_path = trim_audio_if_needed(final_audio_path, max_seconds=25)
            
    client = get_client()
    sample_b64 = base64.b64encode(Path(final_audio_path).read_bytes()).decode()
    langs = [l.strip() for l in languages_str.split(",") if l.strip()] or ["en"]
    voice = client.audio.voices.create(
        name=voice_name.strip(),
        sample_audio=sample_b64,
        sample_filename=Path(final_audio_path).name,
        languages=langs,
        gender=gender.lower(),
    )
    
    # Clean up downloaded file
    if url_input.strip() and os.path.exists(final_audio_path):
        try: os.remove(final_audio_path)
        except: pass
        
    return voice