import os import base64 import tempfile import requests import subprocess from pathlib import Path from mistralai.client import Mistral # ─── Client ─────────────────────────────────────────────────────────────────── def get_client(): api_key = os.environ.get("MISTRAL_API_KEY") if not api_key: raise Exception("MISTRAL_API_KEY secret is not set. Please set it as an environment variable.") return Mistral(api_key=api_key) # ─── Utility ────────────────────────────────────────────────────────────────── def trim_audio_if_needed(audio_path, max_seconds=25): """Trims audio to max_seconds using ffmpeg.""" out_path = tempfile.mktemp(suffix=".wav") try: subprocess.run(["ffmpeg", "-y", "-i", audio_path, "-t", str(max_seconds), "-c:a", "pcm_s16le", out_path], check=True) return out_path except Exception as e: print(f"Warning: Failed to trim audio, returning original: {e}") return audio_path def list_user_voices(): try: client = get_client() result = client.audio.voices.list(limit=100, offset=0) if result.total == 0: return "No voices found in your account." out = f"**Total Voices:** {result.total}\n\n" for voice in result.items: out += f"- **{voice.name}**\n - ID: `{voice.id}`\n - Languages: {', '.join(voice.languages) if hasattr(voice, 'languages') else 'unknown'}\n" return out except Exception as e: return f"Error fetching voices: {str(e)}" def get_voice_choices(): try: client = get_client() res = client.audio.voices.list(limit=100, offset=0) # Filter for Official Mistral Voices (Paul, Oliver, Jane, Marie) so we hide randomly cloned user voices official_names = ("Paul", "Oliver", "Jane", "Marie") official = [] for v in res.items: if v.name.startswith(official_names) and " - " in v.name: official.append((f"{v.name}", v.id)) return official except: return [] # ─── STT ────────────────────────────────────────────────────────────────────── def transcribe_audio(audio_path, language): """Convert audio file → text using Voxtral Mini Transcribe.""" if audio_path is None: raise ValueError("Please provide an audio file path.") client = get_client() lang_param = language if language != "Auto-detect" else None with open(audio_path, "rb") as f: kwargs = dict( model="voxtral-mini-latest", file={"content": f, "file_name": Path(audio_path).name}, ) if lang_param: kwargs["language"] = lang_param response = client.audio.transcriptions.complete(**kwargs) return response.text # ─── TTS ────────────────────────────────────────────────────────────────────── def synthesize_speech(text, voice_id=None, ref_audio_path=None, audio_format="mp3"): """Convert text → speech using Voxtral Mini TTS.""" if not text.strip(): raise ValueError("Please enter some text.") client = get_client() kwargs = dict( model="voxtral-mini-tts-2603", input=text, response_format=audio_format, ) if voice_id: kwargs["voice_id"] = voice_id # Add Reference Audio for Zero-shot tone/voice cloning if ref_audio_path: clipped_ref_path = trim_audio_if_needed(ref_audio_path, max_seconds=25) with open(clipped_ref_path, "rb") as f: ref_audio_b64 = base64.b64encode(f.read()).decode("utf-8") kwargs["ref_audio"] = ref_audio_b64 # Cleanup if clipped_ref_path != ref_audio_path and os.path.exists(clipped_ref_path): try: os.remove(clipped_ref_path) except: pass if not voice_id and not ref_audio_path: raise ValueError("Mistral API requires a voice! Please provide either a reference audio or a valid Voice ID.") response = client.audio.speech.complete(**kwargs) audio_bytes = base64.b64decode(response.audio_data) # Write to temp file suffix = f".{audio_format}" tmp = tempfile.NamedTemporaryFile(delete=False, suffix=suffix) tmp.write(audio_bytes) tmp.close() return tmp.name, len(audio_bytes) # ─── Voice Cloning ──────────────────────────────────────────────────────────── def clone_voice(audio_path, url_input, voice_name, gender, languages_str): """Upload a sample audio or provide a URL to create a reusable cloned voice.""" if not audio_path and not url_input.strip(): raise ValueError("Please upload an audio clip or provide a media URL.") if not voice_name.strip(): raise KeyError("Please enter a name for the voice.") final_audio_path = audio_path # If URL is provided, handle direct links or yt-dlp if url_input.strip(): url = url_input.strip() base_out = tempfile.mktemp() # If it's a direct audio file link, bypass yt-dlp and download it directly if url.lower().endswith(('.mp3', '.wav', '.flac', '.ogg', '.m4a')): ext = url.split('.')[-1] final_audio_path = f"{base_out}.{ext}" with requests.get(url, stream=True, timeout=15) as r: r.raise_for_status() with open(final_audio_path, 'wb') as f: for chunk in r.iter_content(chunk_size=8192): f.write(chunk) # Otherwise use yt-dlp else: import yt_dlp ydl_opts = { 'format': 'bestaudio/best', 'outtmpl': base_out + '.%(ext)s', 'quiet': True, 'postprocessors': [{ 'key': 'FFmpegExtractAudio', 'preferredcodec': 'mp3', 'preferredquality': '128', }], 'postprocessor_args': [ '-t', '25' # Hard Limit to 25 seconds to bypass API 30s limit ], } with yt_dlp.YoutubeDL(ydl_opts) as ydl: info = ydl.extract_info(url, download=True) final_audio_path = base_out + '.mp3' # Ensure any direct MP3 or uploaded file is ALSO strictly trimmed final_audio_path = trim_audio_if_needed(final_audio_path, max_seconds=25) client = get_client() sample_b64 = base64.b64encode(Path(final_audio_path).read_bytes()).decode() langs = [l.strip() for l in languages_str.split(",") if l.strip()] or ["en"] voice = client.audio.voices.create( name=voice_name.strip(), sample_audio=sample_b64, sample_filename=Path(final_audio_path).name, languages=langs, gender=gender.lower(), ) # Clean up downloaded file if url_input.strip() and os.path.exists(final_audio_path): try: os.remove(final_audio_path) except: pass return voice