MaenGit
update
a7bb300
import os
import json
import httpx
import asyncio
import logging
from fastapi import FastAPI, HTTPException
from fastapi.responses import StreamingResponse
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel
import edge_tts
import uvicorn
import base64
# إعدادات اللوج
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
app = FastAPI()
# تفعيل CORS للاتصال مع Next.js
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_methods=["*"],
allow_headers=["*"],
)
OLLAMA_URL = "http://localhost:11434/api/chat"
class ChatRequest(BaseModel):
messages: list
voice: str = "ar-SA-HamedNeural"
rate: str = "+0%"
temp: float = 0.8
top_penalty: float = 0.8
top_p: float = 0.8
async def get_full_voice_and_text(payload, voice, rate):
async with httpx.AsyncClient(timeout=None) as client:
try:
# 1. Get the full response from Ollama at once
# We ensure "stream": False is in the payload
payload["stream"] = False
payload["format"] = "json"
response = await client.post(OLLAMA_URL, json=payload)
response.raise_for_status()
result = response.json()
print(result)
full_text = result.get("message", {}).get("content", "")
# print(full_text)
full_result={}
try :
full_result = json.loads(full_text)
except Exception as e:
logger.error(f"Error: {e}")
# print("hello world")
# print(full_result)
# print("hello world2")
print("The state is ")
print(full_result.get("state"))
# print("hello world3")
if full_result.get("state") =="voice":
print("voice")
retries=2
for attempt in range(retries):
try:
communicate = edge_tts.Communicate(full_result.get("content", ""), voice, rate=rate)
audio_data = b""
async for chunk in communicate.stream():
if chunk["type"] == "audio":
audio_data += chunk["data"]
if len(audio_data) > 0:
#3. Encode to Base64
audio_base64 = base64.b64encode(audio_data).decode('utf-8')
# print("hello")
return {
"status": "success",
"state":"voice",
"audio": audio_base64,
"text": full_result.get("content","text"),
}
else:
raise Exception("Empty audio")
except Exception as e:
logger.warning(f"Attempt {attempt + 1} failed: {e}")
await asyncio.sleep(1) # Wait a second before retrying
# If we reach here, all retries failed
#return {"status": "error", "message": "Failed to generate audio after retries"}
return {
"status": "success",
"state":"message",
"text": full_result.get("content","text"),
}
#################################################################
# print("voice")
# 2. Convert the ENTIRE text to speech
# communicate = edge_tts.Communicate(full_result.get("content","text"), voice, rate=rate)
# audio_data = b""
# async for chunk in communicate.stream():
# if chunk["type"] == "audio":
# audio_data += chunk["data"]
# # 3. Encode to Base64
# audio_base64 = base64.b64encode(audio_data).decode('utf-8')
# # print("hello")
# return {
# "status": "success",
# "state":"voice",
# "audio": audio_base64,
# "text": full_result.get("content","text"),
# }
#######################################
else :
print("text")
return {
"status": "success",
"state":"message",
"text": full_result.get("content","text"),
}
except Exception as e:
logger.error(f"Error: {e}")
return {"status": "error", "message": str(e)}
@app.post("/stream-voice")
async def voice_engine(data: ChatRequest):
# print(data.messages)
clean_messages = []
for m in data.messages:
content = m.get("content")
if isinstance(content, dict):
content = content.get("message") or content.get("text")
if isinstance(content, str):
clean_messages.append({"role": m["role"], "content": content})
if not clean_messages:
raise HTTPException(status_code=400, detail="No valid messages")
print("Clean Messages is ")
print(clean_messages)
payload = {
"model": "llama3.2",
"messages": clean_messages,
"stream": False, # Set to False for non-streaming
"options": {
"temperature": data.temp, # Increased for more personality
"top_p": data.top_p,
"presence_penalty": data.top_penalty, # Encourages the model to talk more like a human
}
}
result = await get_full_voice_and_text(payload, data.voice, data.rate)
return result # Returns a single JSON object
@app.get("/")
async def check():
return {
"status": "success",
"state":"ok"
}
if __name__ == "__main__":
uvicorn.run(app, host="0.0.0.0", port=7860)