Spaces:
Paused
Paused
File size: 4,586 Bytes
b11d52a c961197 b11d52a c961197 b11d52a c961197 b11d52a c961197 b11d52a c961197 b11d52a c961197 b11d52a c961197 b11d52a c961197 b11d52a c961197 b11d52a c961197 b11d52a c961197 b11d52a c961197 b11d52a c961197 b11d52a c961197 b11d52a c961197 b11d52a c961197 b11d52a c961197 b11d52a c961197 b11d52a c961197 b11d52a c961197 b11d52a c961197 b11d52a c961197 b11d52a c961197 b11d52a c961197 b11d52a c961197 b11d52a c961197 b11d52a c961197 b11d52a 8259369 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 | # spam_space_backend.py
# Install: pip install flask flask-socketio gradio_client
from flask import Flask, request
from flask_socketio import SocketIO, emit
from gradio_client import Client, handle_file
import os, base64, threading, time, logging
from datetime import datetime, timedelta
# ----------------- Logging -----------------
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
# ----------------- Flask + SocketIO -----------------
app = Flask(__name__)
# Use 'threading' mode for maximum compatibility on Spaces
socketio = SocketIO(app, cors_allowed_origins="*", async_mode='threading')
# ----------------- HF Space -----------------
HF_SPACE_URL = "https://tonyassi-voice-clone.hf.space" # Replace with your Space API URL
try:
client = Client(HF_SPACE_URL)
logger.info("Gradio Client loaded successfully!")
except Exception as e:
logger.error(f"Failed to load client: {e}")
exit(1)
# ----------------- Task & Quota Tracking -----------------
active_tasks = {}
quota_info = {"reset_time": None, "retry_after": None}
# ----------------- Routes -----------------
@app.route("/status")
def status_check():
return {"status": "ok", "active_tasks": len(active_tasks), "quota_reset_time": quota_info["reset_time"]}
# ----------------- SocketIO Events -----------------
@socketio.on("connect")
def handle_connect():
sid = request.sid
logger.info(f"Client connected: {sid}")
emit("status", {"message": "Connected to backend"})
@socketio.on("disconnect")
def handle_disconnect():
sid = request.sid
logger.info(f"Client disconnected: {sid}")
if sid in active_tasks:
del active_tasks[sid]
@socketio.on("generate_voice")
def handle_generate_voice(data):
sid = request.sid
try:
text = data.get("text")
audio_base64 = data.get("audio")
if not text or not audio_base64:
emit("error", {"message": "Text or audio missing"})
return
# Track active task
active_tasks[sid] = {"start_time": datetime.now(), "status": "processing"}
emit("status", {"message": "Processing request..."})
# Process in background thread
threading.Thread(target=process_voice, args=(sid, text, audio_base64), daemon=True).start()
except Exception as e:
logger.error(f"Error in generate_voice: {e}")
emit("error", {"message": f"Failed to process request: {str(e)}"})
if sid in active_tasks:
del active_tasks[sid]
# ----------------- Voice Processing -----------------
def process_voice(sid, text, audio_base64):
temp_audio_path = f"/tmp/temp_reference_{sid}.wav"
try:
# Decode audio
if audio_base64.startswith("data:"):
audio_base64 = audio_base64.split(",")[1]
with open(temp_audio_path, "wb") as f:
f.write(base64.b64decode(audio_base64))
# Call HF Space API
socketio.emit("status", {"message": "Calling HF Space API..."}, room=sid)
result_path = client.predict(text, handle_file(temp_audio_path), api_name="/predict")
# Read result and send back
with open(result_path, "rb") as f:
output_audio = f.read()
output_base64 = base64.b64encode(output_audio).decode("utf-8")
socketio.emit("voice_generated", {"audio": f"data:audio/wav;base64,{output_base64}"}, room=sid)
socketio.emit("status", {"message": "Generation complete"}, room=sid)
except Exception as e:
logger.error(f"Error in process_voice: {e}")
socketio.emit("error", {"message": f"Generation failed: {str(e)}"}, room=sid)
finally:
# Cleanup
if os.path.exists(temp_audio_path):
os.remove(temp_audio_path)
if sid in active_tasks:
del active_tasks[sid]
# ----------------- Cleanup Thread -----------------
def cleanup_old_files():
while True:
now = time.time()
for f in os.listdir("/tmp"):
if f.startswith("temp_reference_") and f.endswith(".wav"):
path = os.path.join("/tmp", f)
if now - os.path.getctime(path) > 3600: # 1 hour
os.remove(path)
logger.info(f"Removed old file: {f}")
time.sleep(3600)
threading.Thread(target=cleanup_old_files, daemon=True).start()
# ----------------- Main -----------------
if __name__ == "__main__":
logger.info("Starting backend...")
socketio.run(app, host="0.0.0.0", port=5000, debug=True, allow_unsafe_werkzeug=True) |