voiceai-server / app.py
Devity4756's picture
Update app.py
8259369 verified
# spam_space_backend.py
# Install: pip install flask flask-socketio gradio_client
from flask import Flask, request
from flask_socketio import SocketIO, emit
from gradio_client import Client, handle_file
import os, base64, threading, time, logging
from datetime import datetime, timedelta
# ----------------- Logging -----------------
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
# ----------------- Flask + SocketIO -----------------
app = Flask(__name__)
# Use 'threading' mode for maximum compatibility on Spaces
socketio = SocketIO(app, cors_allowed_origins="*", async_mode='threading')
# ----------------- HF Space -----------------
HF_SPACE_URL = "https://tonyassi-voice-clone.hf.space" # Replace with your Space API URL
try:
client = Client(HF_SPACE_URL)
logger.info("Gradio Client loaded successfully!")
except Exception as e:
logger.error(f"Failed to load client: {e}")
exit(1)
# ----------------- Task & Quota Tracking -----------------
active_tasks = {}
quota_info = {"reset_time": None, "retry_after": None}
# ----------------- Routes -----------------
@app.route("/status")
def status_check():
return {"status": "ok", "active_tasks": len(active_tasks), "quota_reset_time": quota_info["reset_time"]}
# ----------------- SocketIO Events -----------------
@socketio.on("connect")
def handle_connect():
sid = request.sid
logger.info(f"Client connected: {sid}")
emit("status", {"message": "Connected to backend"})
@socketio.on("disconnect")
def handle_disconnect():
sid = request.sid
logger.info(f"Client disconnected: {sid}")
if sid in active_tasks:
del active_tasks[sid]
@socketio.on("generate_voice")
def handle_generate_voice(data):
sid = request.sid
try:
text = data.get("text")
audio_base64 = data.get("audio")
if not text or not audio_base64:
emit("error", {"message": "Text or audio missing"})
return
# Track active task
active_tasks[sid] = {"start_time": datetime.now(), "status": "processing"}
emit("status", {"message": "Processing request..."})
# Process in background thread
threading.Thread(target=process_voice, args=(sid, text, audio_base64), daemon=True).start()
except Exception as e:
logger.error(f"Error in generate_voice: {e}")
emit("error", {"message": f"Failed to process request: {str(e)}"})
if sid in active_tasks:
del active_tasks[sid]
# ----------------- Voice Processing -----------------
def process_voice(sid, text, audio_base64):
temp_audio_path = f"/tmp/temp_reference_{sid}.wav"
try:
# Decode audio
if audio_base64.startswith("data:"):
audio_base64 = audio_base64.split(",")[1]
with open(temp_audio_path, "wb") as f:
f.write(base64.b64decode(audio_base64))
# Call HF Space API
socketio.emit("status", {"message": "Calling HF Space API..."}, room=sid)
result_path = client.predict(text, handle_file(temp_audio_path), api_name="/predict")
# Read result and send back
with open(result_path, "rb") as f:
output_audio = f.read()
output_base64 = base64.b64encode(output_audio).decode("utf-8")
socketio.emit("voice_generated", {"audio": f"data:audio/wav;base64,{output_base64}"}, room=sid)
socketio.emit("status", {"message": "Generation complete"}, room=sid)
except Exception as e:
logger.error(f"Error in process_voice: {e}")
socketio.emit("error", {"message": f"Generation failed: {str(e)}"}, room=sid)
finally:
# Cleanup
if os.path.exists(temp_audio_path):
os.remove(temp_audio_path)
if sid in active_tasks:
del active_tasks[sid]
# ----------------- Cleanup Thread -----------------
def cleanup_old_files():
while True:
now = time.time()
for f in os.listdir("/tmp"):
if f.startswith("temp_reference_") and f.endswith(".wav"):
path = os.path.join("/tmp", f)
if now - os.path.getctime(path) > 3600: # 1 hour
os.remove(path)
logger.info(f"Removed old file: {f}")
time.sleep(3600)
threading.Thread(target=cleanup_old_files, daemon=True).start()
# ----------------- Main -----------------
if __name__ == "__main__":
logger.info("Starting backend...")
socketio.run(app, host="0.0.0.0", port=5000, debug=True, allow_unsafe_werkzeug=True)