Spaces:
Sleeping
Sleeping
refactor: decouple API functions to core.py
Browse files
app.py
CHANGED
|
@@ -2,180 +2,39 @@ import os
|
|
| 2 |
import base64
|
| 3 |
import tempfile
|
| 4 |
import gradio as gr
|
|
|
|
|
|
|
|
|
|
|
|
|
| 5 |
from pathlib import Path
|
| 6 |
-
import requests
|
| 7 |
-
from mistralai.client import Mistral
|
| 8 |
|
| 9 |
-
|
| 10 |
-
|
| 11 |
-
|
| 12 |
-
|
| 13 |
-
|
| 14 |
-
|
| 15 |
-
return "No voices found in your account."
|
| 16 |
-
out = f"**Total Voices:** {result.total}\n\n"
|
| 17 |
-
for voice in result.items:
|
| 18 |
-
out += f"- **{voice.name}**\n - ID: `{voice.id}`\n - Languages: {', '.join(voice.languages) if hasattr(voice, 'languages') else 'unknown'}\n"
|
| 19 |
-
return out
|
| 20 |
-
except Exception as e:
|
| 21 |
-
return f"Error fetching voices: {str(e)}"
|
| 22 |
|
| 23 |
-
|
| 24 |
-
|
| 25 |
-
|
| 26 |
-
res = client.audio.voices.list(limit=100, offset=0)
|
| 27 |
-
# Filter for Official Mistral Voices (Paul, Oliver, Jane, Marie) so we hide randomly cloned user voices
|
| 28 |
-
official_names = ("Paul", "Oliver", "Jane", "Marie")
|
| 29 |
-
official = []
|
| 30 |
-
for v in res.items:
|
| 31 |
-
if v.name.startswith(official_names) and " - " in v.name:
|
| 32 |
-
official.append((f"{v.name}", v.id))
|
| 33 |
-
return official
|
| 34 |
-
except:
|
| 35 |
-
return []
|
| 36 |
-
|
| 37 |
-
|
| 38 |
-
# βββ Client βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
|
| 39 |
-
def get_client():
|
| 40 |
-
api_key = os.environ.get("MISTRAL_API_KEY")
|
| 41 |
-
if not api_key:
|
| 42 |
-
raise gr.Error("MISTRAL_API_KEY secret is not set. Add it in Space Settings β Secrets.")
|
| 43 |
-
return Mistral(api_key=api_key)
|
| 44 |
-
|
| 45 |
-
|
| 46 |
-
# βββ STT ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
|
| 47 |
-
def transcribe_audio(audio_path, language):
|
| 48 |
-
"""Convert audio file β text using Voxtral Mini Transcribe."""
|
| 49 |
-
if audio_path is None:
|
| 50 |
return "β οΈ Please record or upload an audio file first."
|
| 51 |
try:
|
| 52 |
-
|
| 53 |
-
lang_param = language if language != "Auto-detect" else None
|
| 54 |
-
with open(audio_path, "rb") as f:
|
| 55 |
-
kwargs = dict(
|
| 56 |
-
model="voxtral-mini-latest",
|
| 57 |
-
file={"content": f, "file_name": Path(audio_path).name},
|
| 58 |
-
)
|
| 59 |
-
if lang_param:
|
| 60 |
-
kwargs["language"] = lang_param
|
| 61 |
-
response = client.audio.transcriptions.complete(**kwargs)
|
| 62 |
-
return response.text
|
| 63 |
except Exception as e:
|
| 64 |
return f"β Error: {str(e)}"
|
| 65 |
|
| 66 |
-
|
| 67 |
-
# βββ TTS ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
|
| 68 |
-
BUILTIN_VOICES = {
|
| 69 |
-
"Default (no voice clone)": None,
|
| 70 |
-
}
|
| 71 |
-
|
| 72 |
-
def synthesize_speech(text, voice_id_input, ref_audio_path, audio_format):
|
| 73 |
-
"""Convert text β speech using Voxtral Mini TTS."""
|
| 74 |
-
if not text.strip():
|
| 75 |
-
return None, "β οΈ Please enter some text."
|
| 76 |
try:
|
| 77 |
-
client = get_client()
|
| 78 |
voice_id = voice_id_input.strip() if voice_id_input and voice_id_input.strip() else None
|
| 79 |
-
|
| 80 |
-
|
| 81 |
-
model="voxtral-mini-tts-2603",
|
| 82 |
-
input=text,
|
| 83 |
-
response_format=audio_format,
|
| 84 |
-
)
|
| 85 |
-
if voice_id:
|
| 86 |
-
kwargs["voice_id"] = voice_id
|
| 87 |
-
|
| 88 |
-
# Add Reference Audio for Zero-shot tone/voice cloning
|
| 89 |
-
if ref_audio_path:
|
| 90 |
-
with open(ref_audio_path, "rb") as f:
|
| 91 |
-
ref_audio_b64 = base64.b64encode(f.read()).decode("utf-8")
|
| 92 |
-
kwargs["ref_audio"] = ref_audio_b64
|
| 93 |
-
if not voice_id and not ref_audio_path:
|
| 94 |
-
raise gr.Error("Mistral API requires a voice! Please either upload a short 'Reference Audio' clip to define the voice tone zero-shot, OR paste a valid Voice ID you cloned in the Voice Cloning tab. There are no built-in standard voices.")
|
| 95 |
-
|
| 96 |
-
response = client.audio.speech.complete(**kwargs)
|
| 97 |
-
audio_bytes = base64.b64decode(response.audio_data)
|
| 98 |
-
|
| 99 |
-
# Write to temp file
|
| 100 |
-
suffix = f".{audio_format}"
|
| 101 |
-
tmp = tempfile.NamedTemporaryFile(delete=False, suffix=suffix)
|
| 102 |
-
tmp.write(audio_bytes)
|
| 103 |
-
tmp.close()
|
| 104 |
-
return tmp.name, f"β
Generated {len(audio_bytes):,} bytes of {audio_format.upper()} audio."
|
| 105 |
except Exception as e:
|
| 106 |
return None, f"β Error: {str(e)}"
|
| 107 |
|
| 108 |
-
|
| 109 |
-
# βββ Voice Cloning ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
|
| 110 |
-
def clone_voice(audio_path, url_input, voice_name, gender, languages_str):
|
| 111 |
-
"""Upload a sample audio or provide a URL to create a reusable cloned voice."""
|
| 112 |
-
if not audio_path and not url_input.strip():
|
| 113 |
-
return "β οΈ Please upload an audio clip or provide a media URL.", gr.update()
|
| 114 |
-
if not voice_name.strip():
|
| 115 |
-
return "β οΈ Please enter a name for the voice.", gr.update()
|
| 116 |
-
|
| 117 |
-
final_audio_path = audio_path
|
| 118 |
-
|
| 119 |
try:
|
| 120 |
-
|
| 121 |
-
if url_input.strip():
|
| 122 |
-
url = url_input.strip()
|
| 123 |
-
base_out = tempfile.mktemp()
|
| 124 |
-
|
| 125 |
-
# If it's a direct audio file link, bypass yt-dlp and download it directly
|
| 126 |
-
if url.lower().endswith(('.mp3', '.wav', '.flac', '.ogg', '.m4a')):
|
| 127 |
-
try:
|
| 128 |
-
ext = url.split('.')[-1]
|
| 129 |
-
final_audio_path = f"{base_out}.{ext}"
|
| 130 |
-
with requests.get(url, stream=True, timeout=15) as r:
|
| 131 |
-
r.raise_for_status()
|
| 132 |
-
with open(final_audio_path, 'wb') as f:
|
| 133 |
-
for chunk in r.iter_content(chunk_size=8192):
|
| 134 |
-
f.write(chunk)
|
| 135 |
-
except Exception as e:
|
| 136 |
-
return f"β Error downloading direct audio link: {str(e)}", gr.update()
|
| 137 |
-
# Otherwise use yt-dlp for TikTok, Twitter, YouTube (if not blocked), etc.
|
| 138 |
-
else:
|
| 139 |
-
import yt_dlp
|
| 140 |
-
ydl_opts = {
|
| 141 |
-
'format': 'bestaudio/best',
|
| 142 |
-
'outtmpl': base_out + '.%(ext)s',
|
| 143 |
-
'quiet': True,
|
| 144 |
-
'postprocessors': [{
|
| 145 |
-
'key': 'FFmpegExtractAudio',
|
| 146 |
-
'preferredcodec': 'mp3',
|
| 147 |
-
'preferredquality': '128',
|
| 148 |
-
}],
|
| 149 |
-
'postprocessor_args': [
|
| 150 |
-
'-t', '60' # Limit to first 60 seconds
|
| 151 |
-
],
|
| 152 |
-
}
|
| 153 |
-
try:
|
| 154 |
-
with yt_dlp.YoutubeDL(ydl_opts) as ydl:
|
| 155 |
-
info = ydl.extract_info(url, download=True)
|
| 156 |
-
final_audio_path = base_out + '.mp3'
|
| 157 |
-
except Exception as e:
|
| 158 |
-
err_msg = str(e)
|
| 159 |
-
if "Sign in to confirm" in err_msg or "bot" in err_msg.lower() or "youtube" in err_msg.lower():
|
| 160 |
-
raise gr.Error("YouTube blocked the Hugging Face Server. Please use a TikTok/Twitter link, OR paste a direct .MP3 URL, OR upload the file manually.")
|
| 161 |
-
else:
|
| 162 |
-
raise gr.Error(f"Video download failed: {err_msg}")
|
| 163 |
-
|
| 164 |
-
client = get_client()
|
| 165 |
-
sample_b64 = base64.b64encode(Path(final_audio_path).read_bytes()).decode()
|
| 166 |
-
langs = [l.strip() for l in languages_str.split(",") if l.strip()] or ["en"]
|
| 167 |
-
voice = client.audio.voices.create(
|
| 168 |
-
name=voice_name.strip(),
|
| 169 |
-
sample_audio=sample_b64,
|
| 170 |
-
sample_filename=Path(final_audio_path).name,
|
| 171 |
-
languages=langs,
|
| 172 |
-
gender=gender.lower(),
|
| 173 |
-
)
|
| 174 |
-
|
| 175 |
-
# Clean up downloaded file
|
| 176 |
-
if url_input.strip() and os.path.exists(final_audio_path):
|
| 177 |
-
try: os.remove(final_audio_path)
|
| 178 |
-
except: pass
|
| 179 |
# Build new choices specifically for this user session: Official Voices + Their new clone
|
| 180 |
new_session_choices = get_voice_choices() + [(f"{voice.name} (Custom Session Clone)", voice.id)]
|
| 181 |
return (
|
|
@@ -183,7 +42,10 @@ def clone_voice(audio_path, url_input, voice_name, gender, languages_str):
|
|
| 183 |
gr.update(choices=new_session_choices, value=voice.id)
|
| 184 |
)
|
| 185 |
except Exception as e:
|
| 186 |
-
|
|
|
|
|
|
|
|
|
|
| 187 |
|
| 188 |
|
| 189 |
# βββ UI ββββοΏ½οΏ½ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
|
|
@@ -399,7 +261,7 @@ with gr.Blocks(title="Voxtral Studio β Mistral AI Audio", css=css) as demo:
|
|
| 399 |
)
|
| 400 |
|
| 401 |
stt_btn.click(
|
| 402 |
-
fn=
|
| 403 |
inputs=[stt_audio, stt_language],
|
| 404 |
outputs=stt_output,
|
| 405 |
)
|
|
@@ -451,7 +313,7 @@ with gr.Blocks(title="Voxtral Studio β Mistral AI Audio", css=css) as demo:
|
|
| 451 |
tts_status = gr.Markdown(elem_classes=["status-text"])
|
| 452 |
|
| 453 |
tts_btn.click(
|
| 454 |
-
fn=
|
| 455 |
inputs=[tts_text, tts_voice_id, tts_ref_audio, tts_format],
|
| 456 |
outputs=[tts_audio_out, tts_status],
|
| 457 |
)
|
|
@@ -504,7 +366,7 @@ with gr.Blocks(title="Voxtral Studio β Mistral AI Audio", css=css) as demo:
|
|
| 504 |
)
|
| 505 |
|
| 506 |
clone_btn.click(
|
| 507 |
-
fn=
|
| 508 |
inputs=[clone_audio, clone_url, clone_name, clone_gender, clone_langs],
|
| 509 |
outputs=[clone_result, tts_voice_id],
|
| 510 |
)
|
|
|
|
| 2 |
import base64
|
| 3 |
import tempfile
|
| 4 |
import gradio as gr
|
| 5 |
+
import base64
|
| 6 |
+
import os
|
| 7 |
+
import tempfile
|
| 8 |
+
import gradio as gr
|
| 9 |
from pathlib import Path
|
|
|
|
|
|
|
| 10 |
|
| 11 |
+
from core import (
|
| 12 |
+
get_voice_choices,
|
| 13 |
+
transcribe_audio as core_transcribe,
|
| 14 |
+
synthesize_speech as core_synthesize,
|
| 15 |
+
clone_voice as core_clone
|
| 16 |
+
)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 17 |
|
| 18 |
+
# βββ Gradio App Wrappers ββββββββββββββββββββββββββββββββββββββββββββββββββββββ
|
| 19 |
+
def transcribe_handler(audio_path, language):
|
| 20 |
+
if not audio_path:
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 21 |
return "β οΈ Please record or upload an audio file first."
|
| 22 |
try:
|
| 23 |
+
return core_transcribe(audio_path, language)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 24 |
except Exception as e:
|
| 25 |
return f"β Error: {str(e)}"
|
| 26 |
|
| 27 |
+
def synthesize_handler(text, voice_id_input, ref_audio_path, audio_format):
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 28 |
try:
|
|
|
|
| 29 |
voice_id = voice_id_input.strip() if voice_id_input and voice_id_input.strip() else None
|
| 30 |
+
output_path, num_bytes = core_synthesize(text, voice_id, ref_audio_path, audio_format)
|
| 31 |
+
return output_path, f"β
Generated {num_bytes:,} bytes of {audio_format.upper()} audio."
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 32 |
except Exception as e:
|
| 33 |
return None, f"β Error: {str(e)}"
|
| 34 |
|
| 35 |
+
def clone_handler(audio_path, url_input, voice_name, gender, languages_str):
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 36 |
try:
|
| 37 |
+
voice = core_clone(audio_path, url_input, voice_name, gender, languages_str)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 38 |
# Build new choices specifically for this user session: Official Voices + Their new clone
|
| 39 |
new_session_choices = get_voice_choices() + [(f"{voice.name} (Custom Session Clone)", voice.id)]
|
| 40 |
return (
|
|
|
|
| 42 |
gr.update(choices=new_session_choices, value=voice.id)
|
| 43 |
)
|
| 44 |
except Exception as e:
|
| 45 |
+
err_msg = str(e)
|
| 46 |
+
if "Sign in to confirm" in err_msg or "bot" in err_msg.lower() or "youtube" in err_msg.lower():
|
| 47 |
+
return "β YouTube blocked the proxy crawler. Please use a TikTok/Twitter link, OR paste a direct .MP3 URL, OR upload the file manually.", gr.update()
|
| 48 |
+
return f"β Error: {err_msg}", gr.update()
|
| 49 |
|
| 50 |
|
| 51 |
# βββ UI ββββοΏ½οΏ½ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
|
|
|
|
| 261 |
)
|
| 262 |
|
| 263 |
stt_btn.click(
|
| 264 |
+
fn=transcribe_handler,
|
| 265 |
inputs=[stt_audio, stt_language],
|
| 266 |
outputs=stt_output,
|
| 267 |
)
|
|
|
|
| 313 |
tts_status = gr.Markdown(elem_classes=["status-text"])
|
| 314 |
|
| 315 |
tts_btn.click(
|
| 316 |
+
fn=synthesize_handler,
|
| 317 |
inputs=[tts_text, tts_voice_id, tts_ref_audio, tts_format],
|
| 318 |
outputs=[tts_audio_out, tts_status],
|
| 319 |
)
|
|
|
|
| 366 |
)
|
| 367 |
|
| 368 |
clone_btn.click(
|
| 369 |
+
fn=clone_handler,
|
| 370 |
inputs=[clone_audio, clone_url, clone_name, clone_gender, clone_langs],
|
| 371 |
outputs=[clone_result, tts_voice_id],
|
| 372 |
)
|