Arabic-LLM-TTS / main.py
MaenGit
using qwen2.5:3b
caea90c
import os
import json
import httpx
import asyncio
import logging
from fastapi import FastAPI, HTTPException
from fastapi.responses import StreamingResponse
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel
import edge_tts
import uvicorn
import base64
# إعدادات اللوج
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
app = FastAPI()
# تفعيل CORS للاتصال مع Next.js
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_methods=["*"],
allow_headers=["*"],
)
OLLAMA_URL = "http://localhost:11434/api/chat"
class ChatRequest(BaseModel):
messages: list
voice: str = "ar-SA-HamedNeural"
rate: str = "+0%"
async def stream_text_and_voice(payload,messages, voice, rate):
full_response_text = ""
sentence_buffer = ""
async with httpx.AsyncClient(timeout=None) as client:
try:
async with client.stream("POST", OLLAMA_URL, json=payload) as response:
async for line in response.aiter_lines():
if not line: continue
chunk = json.loads(line)
token = chunk.get("message", {}).get("content", "")
sentence_buffer += token
full_response_text += token
# Check for sentence end
if any(punct in token for punct in [".", "!", "?", "؟", "\n"]):
clean_text = sentence_buffer.strip()
print(clean_text)
if clean_text:
# 1. إنشاء كائن التواصل مع edge-tts
communicate = edge_tts.Communicate(clean_text, voice, rate=rate)
# 2. تجميع البيانات الصوتية في الذاكرة (Memory)
audio_data = b""
async for chunk in communicate.stream():
if chunk["type"] == "audio":
audio_data += chunk["data"]
# 3. تحويل البيانات الصوتية الكاملة للجملة إلى Base64
if audio_data:
audio_base64 = base64.b64encode(audio_data).decode('utf-8')
# 4. إرسالها كسطر JSON واحد للمتصفح
yield f'{{ "type": "audio", "data": "{audio_base64}" }}\n'
sentence_buffer = "" # تصغير البفر لبدء جملة جديدة
# Handle remaining text in buffer
if sentence_buffer.strip():
communicate = edge_tts.Communicate(sentence_buffer.strip(), voice, rate=rate)
async for audio_chunk in communicate.stream():
if audio_chunk["type"] == "audio":
b64_data = base64.b64encode(audio_chunk["data"]).decode('utf-8')
yield json.dumps({"type": "audio", "data": b64_data}) + "\n"
# THE IMPORTANT PART: Send the text message at the end
yield json.dumps({
"type": "final_text",
"content": full_response_text
}) + "\n"
except Exception as e:
logger.error(f"Error: {e}")
@app.post("/stream-voice")
async def voice_engine(data: ChatRequest):
payload = {
"model": "qwen2.5:3b", # Or whatever model you are using
"messages": data.messages,
"stream": True, # Crucial for streaming
"options": {
"temperature": 0.3,
"top_p": 0.9,
}
}
return StreamingResponse(
stream_text_and_voice(payload,data.messages, data.voice, data.rate),
media_type="audio/mpeg",
headers={"Cache-Control":"no-cache"}
)
if __name__ == "__main__":
uvicorn.run(app, host="0.0.0.0", port=7860)