Spaces:
Paused
Paused
| # spam_space_backend.py | |
| # Install: pip install flask flask-socketio gradio_client | |
| from flask import Flask, request | |
| from flask_socketio import SocketIO, emit | |
| from gradio_client import Client, handle_file | |
| import os, base64, threading, time, logging | |
| from datetime import datetime, timedelta | |
| # ----------------- Logging ----------------- | |
| logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') | |
| logger = logging.getLogger(__name__) | |
| # ----------------- Flask + SocketIO ----------------- | |
| app = Flask(__name__) | |
| # Use 'threading' mode for maximum compatibility on Spaces | |
| socketio = SocketIO(app, cors_allowed_origins="*", async_mode='threading') | |
| # ----------------- HF Space ----------------- | |
| HF_SPACE_URL = "https://tonyassi-voice-clone.hf.space" # Replace with your Space API URL | |
| try: | |
| client = Client(HF_SPACE_URL) | |
| logger.info("Gradio Client loaded successfully!") | |
| except Exception as e: | |
| logger.error(f"Failed to load client: {e}") | |
| exit(1) | |
| # ----------------- Task & Quota Tracking ----------------- | |
| active_tasks = {} | |
| quota_info = {"reset_time": None, "retry_after": None} | |
| # ----------------- Routes ----------------- | |
| def status_check(): | |
| return {"status": "ok", "active_tasks": len(active_tasks), "quota_reset_time": quota_info["reset_time"]} | |
| # ----------------- SocketIO Events ----------------- | |
| def handle_connect(): | |
| sid = request.sid | |
| logger.info(f"Client connected: {sid}") | |
| emit("status", {"message": "Connected to backend"}) | |
| def handle_disconnect(): | |
| sid = request.sid | |
| logger.info(f"Client disconnected: {sid}") | |
| if sid in active_tasks: | |
| del active_tasks[sid] | |
| def handle_generate_voice(data): | |
| sid = request.sid | |
| try: | |
| text = data.get("text") | |
| audio_base64 = data.get("audio") | |
| if not text or not audio_base64: | |
| emit("error", {"message": "Text or audio missing"}) | |
| return | |
| # Track active task | |
| active_tasks[sid] = {"start_time": datetime.now(), "status": "processing"} | |
| emit("status", {"message": "Processing request..."}) | |
| # Process in background thread | |
| threading.Thread(target=process_voice, args=(sid, text, audio_base64), daemon=True).start() | |
| except Exception as e: | |
| logger.error(f"Error in generate_voice: {e}") | |
| emit("error", {"message": f"Failed to process request: {str(e)}"}) | |
| if sid in active_tasks: | |
| del active_tasks[sid] | |
| # ----------------- Voice Processing ----------------- | |
| def process_voice(sid, text, audio_base64): | |
| temp_audio_path = f"/tmp/temp_reference_{sid}.wav" | |
| try: | |
| # Decode audio | |
| if audio_base64.startswith("data:"): | |
| audio_base64 = audio_base64.split(",")[1] | |
| with open(temp_audio_path, "wb") as f: | |
| f.write(base64.b64decode(audio_base64)) | |
| # Call HF Space API | |
| socketio.emit("status", {"message": "Calling HF Space API..."}, room=sid) | |
| result_path = client.predict(text, handle_file(temp_audio_path), api_name="/predict") | |
| # Read result and send back | |
| with open(result_path, "rb") as f: | |
| output_audio = f.read() | |
| output_base64 = base64.b64encode(output_audio).decode("utf-8") | |
| socketio.emit("voice_generated", {"audio": f"data:audio/wav;base64,{output_base64}"}, room=sid) | |
| socketio.emit("status", {"message": "Generation complete"}, room=sid) | |
| except Exception as e: | |
| logger.error(f"Error in process_voice: {e}") | |
| socketio.emit("error", {"message": f"Generation failed: {str(e)}"}, room=sid) | |
| finally: | |
| # Cleanup | |
| if os.path.exists(temp_audio_path): | |
| os.remove(temp_audio_path) | |
| if sid in active_tasks: | |
| del active_tasks[sid] | |
| # ----------------- Cleanup Thread ----------------- | |
| def cleanup_old_files(): | |
| while True: | |
| now = time.time() | |
| for f in os.listdir("/tmp"): | |
| if f.startswith("temp_reference_") and f.endswith(".wav"): | |
| path = os.path.join("/tmp", f) | |
| if now - os.path.getctime(path) > 3600: # 1 hour | |
| os.remove(path) | |
| logger.info(f"Removed old file: {f}") | |
| time.sleep(3600) | |
| threading.Thread(target=cleanup_old_files, daemon=True).start() | |
| # ----------------- Main ----------------- | |
| if __name__ == "__main__": | |
| logger.info("Starting backend...") | |
| socketio.run(app, host="0.0.0.0", port=5000, debug=True, allow_unsafe_werkzeug=True) |