Spaces:
Sleeping
Sleeping
fix(core): migrate extraction to PCM-WAV to bypass libmp3lame missing codec and enforce 25s limit on cloning endpoints
e5362b5 verified | import os | |
| import base64 | |
| import tempfile | |
| import requests | |
| import subprocess | |
| from pathlib import Path | |
| from mistralai.client import Mistral | |
| # βββ Client βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def get_client(): | |
| api_key = os.environ.get("MISTRAL_API_KEY") | |
| if not api_key: | |
| raise Exception("MISTRAL_API_KEY secret is not set. Please set it as an environment variable.") | |
| return Mistral(api_key=api_key) | |
| # βββ Utility ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def trim_audio_if_needed(audio_path, max_seconds=25): | |
| """Trims audio to max_seconds using ffmpeg.""" | |
| out_path = tempfile.mktemp(suffix=".wav") | |
| try: | |
| subprocess.run(["ffmpeg", "-y", "-i", audio_path, "-t", str(max_seconds), "-c:a", "pcm_s16le", out_path], check=True) | |
| return out_path | |
| except Exception as e: | |
| print(f"Warning: Failed to trim audio, returning original: {e}") | |
| return audio_path | |
| def list_user_voices(): | |
| try: | |
| client = get_client() | |
| result = client.audio.voices.list(limit=100, offset=0) | |
| if result.total == 0: | |
| return "No voices found in your account." | |
| out = f"**Total Voices:** {result.total}\n\n" | |
| for voice in result.items: | |
| out += f"- **{voice.name}**\n - ID: `{voice.id}`\n - Languages: {', '.join(voice.languages) if hasattr(voice, 'languages') else 'unknown'}\n" | |
| return out | |
| except Exception as e: | |
| return f"Error fetching voices: {str(e)}" | |
| def get_voice_choices(): | |
| try: | |
| client = get_client() | |
| res = client.audio.voices.list(limit=100, offset=0) | |
| # Filter for Official Mistral Voices (Paul, Oliver, Jane, Marie) so we hide randomly cloned user voices | |
| official_names = ("Paul", "Oliver", "Jane", "Marie") | |
| official = [] | |
| for v in res.items: | |
| if v.name.startswith(official_names) and " - " in v.name: | |
| official.append((f"{v.name}", v.id)) | |
| return official | |
| except: | |
| return [] | |
| # βββ STT ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def transcribe_audio(audio_path, language): | |
| """Convert audio file β text using Voxtral Mini Transcribe.""" | |
| if audio_path is None: | |
| raise ValueError("Please provide an audio file path.") | |
| client = get_client() | |
| lang_param = language if language != "Auto-detect" else None | |
| with open(audio_path, "rb") as f: | |
| kwargs = dict( | |
| model="voxtral-mini-latest", | |
| file={"content": f, "file_name": Path(audio_path).name}, | |
| ) | |
| if lang_param: | |
| kwargs["language"] = lang_param | |
| response = client.audio.transcriptions.complete(**kwargs) | |
| return response.text | |
| # βββ TTS ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def synthesize_speech(text, voice_id=None, ref_audio_path=None, audio_format="mp3"): | |
| """Convert text β speech using Voxtral Mini TTS.""" | |
| if not text.strip(): | |
| raise ValueError("Please enter some text.") | |
| client = get_client() | |
| kwargs = dict( | |
| model="voxtral-mini-tts-2603", | |
| input=text, | |
| response_format=audio_format, | |
| ) | |
| if voice_id: | |
| kwargs["voice_id"] = voice_id | |
| # Add Reference Audio for Zero-shot tone/voice cloning | |
| if ref_audio_path: | |
| clipped_ref_path = trim_audio_if_needed(ref_audio_path, max_seconds=25) | |
| with open(clipped_ref_path, "rb") as f: | |
| ref_audio_b64 = base64.b64encode(f.read()).decode("utf-8") | |
| kwargs["ref_audio"] = ref_audio_b64 | |
| # Cleanup | |
| if clipped_ref_path != ref_audio_path and os.path.exists(clipped_ref_path): | |
| try: os.remove(clipped_ref_path) | |
| except: pass | |
| if not voice_id and not ref_audio_path: | |
| raise ValueError("Mistral API requires a voice! Please provide either a reference audio or a valid Voice ID.") | |
| response = client.audio.speech.complete(**kwargs) | |
| audio_bytes = base64.b64decode(response.audio_data) | |
| # Write to temp file | |
| suffix = f".{audio_format}" | |
| tmp = tempfile.NamedTemporaryFile(delete=False, suffix=suffix) | |
| tmp.write(audio_bytes) | |
| tmp.close() | |
| return tmp.name, len(audio_bytes) | |
| # βββ Voice Cloning ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def clone_voice(audio_path, url_input, voice_name, gender, languages_str): | |
| """Upload a sample audio or provide a URL to create a reusable cloned voice.""" | |
| if not audio_path and not url_input.strip(): | |
| raise ValueError("Please upload an audio clip or provide a media URL.") | |
| if not voice_name.strip(): | |
| raise KeyError("Please enter a name for the voice.") | |
| final_audio_path = audio_path | |
| # If URL is provided, handle direct links or yt-dlp | |
| if url_input.strip(): | |
| url = url_input.strip() | |
| base_out = tempfile.mktemp() | |
| # If it's a direct audio file link, bypass yt-dlp and download it directly | |
| if url.lower().endswith(('.mp3', '.wav', '.flac', '.ogg', '.m4a')): | |
| ext = url.split('.')[-1] | |
| final_audio_path = f"{base_out}.{ext}" | |
| with requests.get(url, stream=True, timeout=15) as r: | |
| r.raise_for_status() | |
| with open(final_audio_path, 'wb') as f: | |
| for chunk in r.iter_content(chunk_size=8192): | |
| f.write(chunk) | |
| # Otherwise use yt-dlp | |
| else: | |
| import yt_dlp | |
| ydl_opts = { | |
| 'format': 'bestaudio/best', | |
| 'outtmpl': base_out + '.%(ext)s', | |
| 'quiet': True, | |
| 'postprocessors': [{ | |
| 'key': 'FFmpegExtractAudio', | |
| 'preferredcodec': 'mp3', | |
| 'preferredquality': '128', | |
| }], | |
| 'postprocessor_args': [ | |
| '-t', '25' # Hard Limit to 25 seconds to bypass API 30s limit | |
| ], | |
| } | |
| with yt_dlp.YoutubeDL(ydl_opts) as ydl: | |
| info = ydl.extract_info(url, download=True) | |
| final_audio_path = base_out + '.mp3' | |
| # Ensure any direct MP3 or uploaded file is ALSO strictly trimmed | |
| final_audio_path = trim_audio_if_needed(final_audio_path, max_seconds=25) | |
| client = get_client() | |
| sample_b64 = base64.b64encode(Path(final_audio_path).read_bytes()).decode() | |
| langs = [l.strip() for l in languages_str.split(",") if l.strip()] or ["en"] | |
| voice = client.audio.voices.create( | |
| name=voice_name.strip(), | |
| sample_audio=sample_b64, | |
| sample_filename=Path(final_audio_path).name, | |
| languages=langs, | |
| gender=gender.lower(), | |
| ) | |
| # Clean up downloaded file | |
| if url_input.strip() and os.path.exists(final_audio_path): | |
| try: os.remove(final_audio_path) | |
| except: pass | |
| return voice | |