XTTS123 / app.py
Arabi32's picture
Update app.py
ce47ec4 verified
import os, sys, time, json, uuid, shutil, threading, subprocess
import nest_asyncio
import uvicorn
from fastapi import FastAPI, Form, File, UploadFile, HTTPException
from fastapi.responses import HTMLResponse, FileResponse, JSONResponse
from fastapi.middleware.cors import CORSMiddleware
from huggingface_hub import HfApi, snapshot_download
# ── البيئة والتكوين ───────────────────────────────────────────────────────
os.environ["COQUI_TOS_AGREED"] = "1"
DATA_DIR = "data"
VOICE_LIB = os.path.join(DATA_DIR, "voice_library")
OUTPUT_DIR = os.path.join(DATA_DIR, "outputs")
HISTORY_FILE = os.path.join(DATA_DIR, "history.json")
for d in [DATA_DIR, VOICE_LIB, OUTPUT_DIR]:
os.makedirs(d, exist_ok=True)
HF_TOKEN = os.environ.get("HF_TOKEN")
HF_DATASET_REPO = os.environ.get("HF_DATASET_REPO")
def pull_data_from_hf():
"""تحميل البيانات من Dataset عند التشغيل"""
if HF_TOKEN and HF_DATASET_REPO:
try:
print(f"[*] جاري تحميل البيانات من: {HF_DATASET_REPO}...")
snapshot_download(
repo_id=HF_DATASET_REPO,
repo_type="dataset",
local_dir=DATA_DIR,
token=HF_TOKEN
)
print("[✓] تم التحميل بنجاح.")
except Exception as e:
print(f"[!] تنبيه: لم يتم مزامنة البيانات (قد تكون المساحة جديدة): {e}")
def push_data_to_hf():
"""رفع البيانات إلى Dataset في الخلفية"""
if HF_TOKEN and HF_DATASET_REPO:
try:
api = HfApi()
api.upload_folder(
folder_path=DATA_DIR,
repo_id=HF_DATASET_REPO,
repo_type="dataset",
token=HF_TOKEN,
commit_message=f"Auto-sync {time.strftime('%Y-%m-%d %H:%M:%S')}"
)
except Exception as e:
print(f"[!] خطأ في المزامنة الخلفية: {e}")
def background_sync():
threading.Thread(target=push_data_to_hf, daemon=True).start()
# تنفيذ السحب الأولي
pull_data_from_hf()
# ── إعداد محرك TTS ──────────────────────────────────────────────────────────
try:
from TTS.api import TTS
except ImportError:
subprocess.check_call([sys.executable, "-m", "pip", "install", "coqui-tts"])
from TTS.api import TTS
import torch
device = "cuda" if torch.cuda.is_available() else "cpu"
print(f"[*] جاري تحميل نموذج XTTS v2 على {device.upper()}...")
xtts_engine = TTS("tts_models/multilingual/multi-dataset/xtts_v2").to(device)
print("[✓] النموذج جاهز للاستخدام.")
# ── المساعدون ─────────────────────────────────────────────────────────────
def load_history():
if os.path.exists(HISTORY_FILE):
try:
with open(HISTORY_FILE, "r", encoding="utf-8") as f:
return json.load(f)
except: return []
return []
def save_history(h):
with open(HISTORY_FILE, "w", encoding="utf-8") as f:
json.dump(h, f, ensure_ascii=False, indent=2)
# ── تطبيق FastAPI ────────────────────────────────────────────────────────
app = FastAPI(title="XTTS Studio Pro")
app.add_middleware(CORSMiddleware, allow_origins=["*"], allow_methods=["*"], allow_headers=["*"])
LANGUAGES = {
"ar": "العربية", "en": "English", "es": "Español", "fr": "Français",
"de": "Deutsch", "it": "Italiano", "pt": "Português", "ru": "Русский",
"zh-cn": "中文", "ja": "日本語", "ko": "한국어", "tr": "Türkçe"
}
# ── واجهة المستخدم (React) ────────────────────────────────────────────────
HTML_TEMPLATE = r"""
<!DOCTYPE html>
<html lang="ar" dir="rtl">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>XTTS Voice Studio</title>
<link href="https://fonts.googleapis.com/css2?family=IBM+Plex+Sans+Arabic:wght@300;400;600;700&display=swap" rel="stylesheet">
<script src="https://cdn.tailwindcss.com"></script>
<script src="https://unpkg.com/react@18/umd/react.production.min.js"></script>
<script src="https://unpkg.com/react-dom@18/umd/react-dom.production.min.js"></script>
<script src="https://unpkg.com/@babel/standalone/babel.min.js"></script>
<style>
:root { --bg: #0b0c0f; --surface: #13151a; --border: #1f2330; --amber: #f5a623; --text: #e8eaf0; --muted: #6b7280; }
body { background: var(--bg); color: var(--text); font-family: 'IBM Plex Sans Arabic', sans-serif; }
.card { background: var(--surface); border: 1px solid var(--border); border-radius: 12px; }
.btn-primary { background: var(--amber); color: #000; font-weight: 700; border-radius: 8px; transition: all 0.2s; }
.btn-primary:hover { opacity: 0.9; transform: translateY(-1px); }
.btn-ghost { border: 1px solid var(--border); color: var(--muted); border-radius: 6px; transition: all 0.2s; }
.btn-ghost:hover { border-color: var(--amber); color: var(--text); }
.tab-active { background: rgba(245,166,35,0.1); color: var(--amber); border: 1px solid var(--amber); }
audio { width: 100%; border-radius: 8px; filter: invert(90%) hue-rotate(180deg); }
</style>
</head>
<body>
<div id="root"></div>
<script type="text/babel">
const { useState, useEffect } = React;
const device = "{DEVICE_NAME}";
function App() {
const [tab, setTab] = useState("generate");
const [text, setText] = useState("");
const [lang, setLang] = useState("ar");
const [file1, setFile1] = useState(null);
const [voices, setVoices] = useState([]);
const [selVoice, setSelVoice] = useState(null);
const [loading, setLoading] = useState(false);
const [audioUrl, setAudioUrl] = useState(null);
const [history, setHistory] = useState([]);
useEffect(() => {
refreshData();
}, []);
const refreshData = () => {
fetch("/voices").then(r => r.json()).then(setVoices);
fetch("/history").then(r => r.json()).then(setHistory);
};
const handleGenerate = async () => {
if (!text || (!file1 && !selVoice)) return alert("يرجى إدخال النص واختيار صوت مرجعي");
setLoading(true);
const fd = new FormData();
fd.append("text", text);
fd.append("language", lang);
if (file1) fd.append("files", file1);
if (selVoice) fd.append("voice_name", selVoice);
try {
const res = await fetch("/generate", { method: "POST", body: fd });
const data = await res.json();
setAudioUrl(`/audio/${data.filename}`);
refreshData();
} catch (e) { alert("حدث خطأ أثناء التوليد"); }
setLoading(false);
};
return (
<div className="max-w-3xl mx-auto p-6">
<header className="text-center mb-8">
<h1 className="text-3xl font-bold text-[#f5a623]">استوديو XTTS v2</h1>
<p className="text-sm text-gray-500 mt-2">يعمل على: <span className="uppercase text-green-500">{device}</span></p>
</header>
<nav className="flex gap-2 mb-6 card p-1">
{[{id:"generate", n:"⚡ توليد"}, {id:"library", n:"📚 المكتبة"}, {id:"history", n:"🕘 السجل"}].map(t => (
<button key={t.id} onClick={() => setTab(t.id)} className={`flex-1 py-2 rounded-lg transition ${tab === t.id ? 'tab-active' : 'text-gray-500'}`}>
{t.n}
</button>
))}
</nav>
{tab === "generate" && (
<div className="space-y-4">
<div className="card p-4">
<div className="flex justify-between mb-2">
<label className="text-xs text-gray-400 uppercase">النص المراد تحويله</label>
<select className="bg-transparent text-sm text-amber-500 outline-none" value={lang} onChange={e => setLang(e.target.value)}>
{Object.entries({LANGUAGES_JSON}).map(([k, v]) => <option key={k} value={k}>{v}</option>)}
</select>
</div>
<textarea className="w-full bg-[#0b0c0f] border border-[#1f2330] rounded-lg p-3 outline-none focus:border-amber-500 transition" rows="4" value={text} onChange={e => setText(e.target.value)} placeholder="اكتب ما تريد هنا..."></textarea>
</div>
<div className="card p-4">
<label className="text-xs text-gray-400 uppercase block mb-3">الصوت المرجعي</label>
<input type="file" accept="audio/*" onChange={e => setFile1(e.target.files[0])} className="text-sm text-gray-500 mb-4 block w-full" />
{voices.length > 0 && (
<div className="flex flex-wrap gap-2">
{voices.map(v => (
<button key={v} onClick={() => setSelVoice(v === selVoice ? null : v)} className={`px-3 py-1 rounded-full text-xs border ${selVoice === v ? 'border-amber-500 text-amber-500 bg-amber-500/10' : 'border-gray-700 text-gray-500'}`}>
{v}
</button>
))}
</div>
)}
</div>
<button onClick={handleGenerate} disabled={loading} className="btn-primary w-full py-4 text-lg">
{loading ? "جاري المعالجة..." : "توليد الصوت الآن"}
</button>
{audioUrl && (
<div className="card p-4 mt-4 animate-pulse">
<audio src={audioUrl} controls autoPlay />
<a href={audioUrl} download className="block text-center mt-2 text-xs text-amber-500">تحميل الملف المولد</a>
</div>
)}
</div>
)}
{tab === "history" && (
<div className="space-y-2">
{history.length === 0 ? <p className="text-center text-gray-600 py-10">لا يوجد سجل حالياً</p> :
history.slice().reverse().map((h, i) => (
<div key={i} className="card p-3 flex justify-between items-center">
<div className="overflow-hidden">
<p className="text-sm truncate w-64">{h.text}</p>
<span className="text-[10px] text-gray-600 uppercase">{h.language} • {new Date(h.ts*1000).toLocaleTimeString()}</span>
</div>
<button onClick={() => { setAudioUrl(`/audio/${h.filename}`); setTab("generate"); }} className="btn-ghost px-3 py-1 text-xs">تشغيل</button>
</div>
))}
</div>
)}
</div>
);
}
ReactDOM.createRoot(document.getElementById("root")).render(<App />);
</script>
</body>
</html>
"""
# ── المسارات ─────────────────────────────────────────────────────────────
@app.get("/", response_class=HTMLResponse)
async def ui():
content = HTML_TEMPLATE.replace("{LANGUAGES_JSON}", json.dumps(LANGUAGES, ensure_ascii=False))
content = content.replace("{DEVICE_NAME}", device)
return content
@app.post("/generate")
async def generate(
text: str = Form(...),
language: str = Form("ar"),
voice_name: str = Form(None),
files: list[UploadFile] = File(default=[])
):
ref_paths = []
temp_dir = f"temp_{uuid.uuid4().hex}"
os.makedirs(temp_dir, exist_ok=True)
try:
# التعامل مع الملفات المرفوعة
for f in files:
p = os.path.join(temp_dir, f.filename)
with open(p, "wb") as b: shutil.copyfileobj(f.file, b)
ref_paths.append(p)
# التعامل مع الأصوات المحفوظة
if voice_name:
v_path = os.path.join(VOICE_LIB, voice_name)
if os.path.isdir(v_path):
ref_paths += [os.path.join(v_path, x) for x in os.listdir(v_path) if x.lower().endswith(('.wav', '.mp3'))]
if not ref_paths:
raise HTTPException(400, "يجب توفير صوت مرجعي")
out_name = f"gen_{uuid.uuid4().hex[:8]}.wav"
out_path = os.path.join(OUTPUT_DIR, out_name)
xtts_engine.tts_to_file(
text=text,
speaker_wav=ref_paths,
language=language,
file_path=out_path,
enable_text_splitting=True
)
# تحديث السجل
hist = load_history()
hist.append({"filename": out_name, "text": text[:100], "language": language, "ts": int(time.time())})
save_history(hist)
background_sync()
return {"filename": out_name}
finally:
shutil.rmtree(temp_dir, ignore_errors=True)
@app.get("/audio/{filename}")
async def get_audio(filename: str):
p = os.path.join(OUTPUT_DIR, filename)
if os.path.exists(p): return FileResponse(p)
raise HTTPException(404)
@app.get("/history")
def get_history(): return load_history()
@app.get("/voices")
def list_voices(): return [d for d in os.listdir(VOICE_LIB) if os.path.isdir(os.path.join(VOICE_LIB, d))]
@app.post("/voices/save")
async def save_voice(name: str = Form(...), file: UploadFile = File(...)):
v_dir = os.path.join(VOICE_LIB, name.strip())
os.makedirs(v_dir, exist_ok=True)
dest = os.path.join(v_dir, file.filename)
with open(dest, "wb") as b: shutil.copyfileobj(file.file, b)
background_sync()
return {"status": "saved"}
# ── التشغيل ──────────────────────────────────────────────────────────────
if __name__ == "__main__":
nest_asyncio.apply()
uvicorn.run(app, host="0.0.0.0", port=7860)