import os import json import time import threading import logging import shutil import urllib.parse from flask import Flask, request, jsonify, send_from_directory, Response, send_file from flask_cors import CORS from huggingface_hub import HfApi, hf_hub_download import requests from huggingface_hub.utils import RepositoryNotFoundError, EntryNotFoundError # --- تنظیمات دیتابیس ابری --- DATASET_REPO = "ezmarynoori/Karbaran-rayegan-tedad" DATASET_FILENAME = "voice_conversion_usage_data.json" BALE_CACHE_FILENAME = "voice_bale_cache.json" USAGE_LIMIT_PER_MODEL = 1 # محدودیت تعداد در روز برای نسخه رایگان HF_TOKEN = os.environ.get("HF_TOKEN") BALE_BOT_TOKEN = os.environ.get("BALE_BOT_TOKEN", "").strip() # پوشه بیلد شده توسط Docker STATIC_FOLDER = 'dist' # --- راه‌اندازی برنامه و پوشه‌های کاربری --- logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') app = Flask(__name__, static_folder=STATIC_FOLDER, static_url_path='') CORS(app) os.makedirs("workspace/temp_downloads", exist_ok=True) os.makedirs("workspace/static_audio", exist_ok=True) # --- متغیرهای کش و کنترل داده --- usage_data_cache = [] bale_cache_data = {} cache_lock = threading.Lock() data_changed = threading.Event() api = None if not HF_TOKEN: logging.error("هشدار: توکن HF_TOKEN یافت نشد. دیتابیس کار نخواهد کرد.") else: api = HfApi(token=HF_TOKEN) # --- لود اولیه داده‌ها --- def load_initial_data(): global usage_data_cache, bale_cache_data with cache_lock: # اطلاعات مصرف کاربران دیگر از سرور خوانده نمی‌شود و به صورت محلی در RAM شروع به کار می‌کند usage_data_cache = [] if not api: return # بارگیری اطلاعات مربوط به کش دائمی بله try: local_cache_path = hf_hub_download( repo_id=DATASET_REPO, filename=BALE_CACHE_FILENAME, repo_type="dataset", token=HF_TOKEN, force_download=True ) with open(local_cache_path, 'r', encoding='utf-8') as f: content = f.read() if content: bale_cache_data = json.loads(content) except (RepositoryNotFoundError, EntryNotFoundError): bale_cache_data = {} except Exception as e: logging.warning(f"Error loading bale cache: {e}") bale_cache_data = {} # --- ذخیره‌سازی در پس‌زمینه --- def persist_data_to_hub(): if not api: return bale_snapshot = None with cache_lock: if data_changed.is_set(): # اطلاعات مصرف کاربران دیگر در گیت‌هاب ذخیره نمی‌شود bale_snapshot = dict(bale_cache_data) data_changed.clear() if bale_snapshot is not None: temp_bale_filepath = "/tmp/temp_bale_cache.json" try: with open(temp_bale_filepath, 'w', encoding='utf-8') as f: json.dump(bale_snapshot, f, ensure_ascii=False, indent=2) api.upload_file( path_or_fileobj=temp_bale_filepath, path_in_repo=BALE_CACHE_FILENAME, repo_id=DATASET_REPO, repo_type="dataset", commit_message="Update bale cache" ) logging.info("Bale cache data saved to HF Hub.") except Exception as e: logging.error(f"Failed to save bale cache data: {e}") data_changed.set() def background_persister(): while True: time.sleep(300) # هر 5 دقیقه بررسی برای ذخیره‌سازی persist_data_to_hub() def get_user_identifier(data): fingerprint = data.get('fingerprint') if fingerprint: return str(fingerprint) if request.headers.getlist("X-Forwarded-For"): return request.headers.getlist("X-Forwarded-For")[0].split(',')[0].strip() return request.remote_addr # --- تابع کمکی ارائه‌دهنده پروتکل امن HTTPS --- def get_secure_host_url(): """تبدیل خودکار آدرس هاست ناامن به https جهت برطرف کردن خطای لود مدیا در مرورگر""" host_url = request.host_url.rstrip('/') if host_url.startswith("http://") and not any(x in host_url for x in ["localhost", "127.0.0.1", "0.0.0.0"]): host_url = "https://" + host_url[7:] return host_url # --- مدیریت کش بله در حافظه --- def get_cached_bale_url(job_id): with cache_lock: return bale_cache_data.get(job_id) def save_cached_bale_url(job_id, url): with cache_lock: bale_cache_data[job_id] = url data_changed.set() # --- متد اصلی ارتباط با پیام‌رسان بله --- def upload_to_bale(file_path): """آپلود فایل صوتی در بله و دریافت مستقیم لینک دانلود دائمی""" if not BALE_BOT_TOKEN: logging.warning("Bale Token in settings is not configured.") return None try: bale_chat_id = BALE_BOT_TOKEN.split(":")[0] except Exception as e: logging.error(f"Error parsing Bale Token chat_id: {e}") return None bale_url = f"https://tapi.bale.ai/bot{BALE_BOT_TOKEN}/sendDocument" for attempt in range(3): try: with open(file_path, 'rb') as f: files = {'document': (os.path.basename(file_path), f)} data = {'chat_id': bale_chat_id} resp = requests.post(bale_url, data=data, files=files, timeout=150) if resp.status_code == 200: bale_data = resp.json() if bale_data.get("ok"): result = bale_data.get("result", {}) doc = result.get("document", {}) file_id = doc.get("file_id") if not file_id: file_id = result.get("video", {}).get("file_id") or result.get("audio", {}).get("file_id") if file_id: return f"https://tapi.bale.ai/file/bot{BALE_BOT_TOKEN}/{file_id}" else: logging.warning(f"Bale HTTP server status: {resp.status_code}") except Exception as e: logging.error(f"Error uploading to Bale (attempt {attempt+1}): {e}") time.sleep(2) return None # --- اسکنر دوره‌ای پاکسازی فضای هارددیسک --- def periodic_cleanup(): """حذف فایل‌های صوتی باقی‌مانده بالای ۵۰ مگابایت پس از ۳ روز""" while True: try: now = time.time() three_days_ago = now - (3 * 24 * 60 * 60) # تمیز کردن موقت‌ها temp_dir = "workspace/temp_downloads" if os.path.exists(temp_dir): for filename in os.listdir(temp_dir): file_path = os.path.join(temp_dir, filename) if os.path.isfile(file_path) and os.path.getmtime(file_path) < three_days_ago: try: os.remove(file_path) except Exception: pass # تمیز کردن پوشه استاتیک بالای ۵۰ مگابایت static_dir = "workspace/static_audio" if os.path.exists(static_dir): for filename in os.listdir(static_dir): file_path = os.path.join(static_dir, filename) if os.path.isfile(file_path) and os.path.getmtime(file_path) < three_days_ago: try: os.remove(file_path) except Exception: pass except Exception as e: logging.error(f"Cleanup Worker Error: {e}") time.sleep(3600) # --- API Endpoints --- @app.route('/') def index(): return send_from_directory(STATIC_FOLDER, 'index.html') # مسیر فایل‌های صوتی با پشتیبانی از ریدایرکت خودکار به بله در صورت حذف فیزیکی @app.route('/static/audio/') def serve_static_audio(filename): local_path = os.path.join("workspace/static_audio", filename) # ۱. اگر فایل به صورت فیزیکی روی دیسک موجود بود، آن را ارسال کن if os.path.exists(local_path): return send_from_directory("workspace/static_audio", filename) # ۲. اگر فایل حذف شده بود، بررسی کن که آیا کش بله برای آن وجود دارد یا خیر # استخراج تمیز شناسه پروژه (job_id) از نام فایل job_id = filename.split('.')[0].split('_')[0] cached_url = get_cached_bale_url(job_id) if cached_url: from flask import redirect # هدایت مرورگر به آدرس استریم پراکسی‌شده بله به صورت کاملاً خودکار return redirect(cached_url) return "فایل صوتی مورد نظر یافت نشد.", 404 @app.route('/') def serve_static(path): if os.path.exists(os.path.join(STATIC_FOLDER, path)): return send_from_directory(STATIC_FOLDER, path) return send_from_directory(STATIC_FOLDER, 'index.html') @app.route('/api/proxy_bale') def proxy_bale(): """پراکسی هوشمند با استریم محلی استاندارد برای باز کردن گره پخش پلیرهای موبایل""" url = request.args.get('url') job_id = request.args.get('job_id', 'unknown') filename = request.args.get('filename', 'audio.wav') if not url or not url.startswith('https://tapi.bale.ai/'): return "آدرس نامعتبر است.", 400 local_dir = "workspace/temp_downloads" os.makedirs(local_dir, exist_ok=True) # استفاده از نام فایل ایمن و همگام با استانداردهای کاراکتری سیستم‌عامل ext = filename.split('.')[-1].lower() if '.' in filename else 'wav' local_path = os.path.join(local_dir, f"{job_id}.{ext}") # ۱. اگر فایل به صورت موقت کش نشده است، آن را از بله به صورت پرسرعت بارگیری کن if not os.path.exists(local_path): try: resp = requests.get(url, timeout=30) if resp.status_code == 200: with open(local_path, 'wb') as f: f.write(resp.content) else: return "خطا در واکشی فایل از بله", 404 except Exception as e: return f"خطا در ارتباط با سرور: {str(e)}", 500 # ۲. تعیین نوع صوتی صریح MIME-Type برای حل باگ بلاک شدن مرورگرهای موبایل mimetype = "audio/wav" if ext == "mp3": mimetype = "audio/mpeg" elif ext == "ogg": mimetype = "audio/ogg" elif ext == "aac": mimetype = "audio/aac" # ۳. ارسال فایل با استفاده از سیستم پیشرفته Range-Request فلاسک برای هماهنگی با پلیرها try: response = send_file(local_path, mimetype=mimetype, conditional=True) response.headers['Access-Control-Allow-Origin'] = '*' response.headers['Access-Control-Allow-Headers'] = 'Range, Content-Type' return response except Exception as e: return str(e), 500 @app.route('/api/status/', methods=['GET']) def proxy_status(job_id): """رهگیری وضعیت رندر و بازسازی خروجی به لینک‌های دائمی بله یا موقت داخلی با پشتیبانی از HTTPS""" cached_url = get_cached_bale_url(job_id) if cached_url: return jsonify({ "status": "ready", "url": cached_url, "message": "تکمیل شد (آرشیو دائمی)" }) url = f"https://opera8-action.hf.space/api/status/{job_id}" try: resp = requests.get(url, timeout=10) if resp.status_code != 200: return jsonify({"status": "processing", "message": "ارتباط با سرور برقرار نشد"}), 200 data = resp.json() if data.get("status") == "ready": remote_relative_url = data.get("url") if remote_relative_url: if remote_relative_url.startswith("http"): full_remote_url = remote_relative_url else: full_remote_url = f"https://opera8-action.hf.space{remote_relative_url}" # دانلود و اندازه‌گیری حجم فایل صوتی local_dir = "workspace/temp_downloads" filename = remote_relative_url.split("/")[-1] ext = filename.split('.')[-1].lower() if '.' in filename else 'wav' local_path = os.path.join(local_dir, f"{job_id}.{ext}") try: file_resp = requests.get(full_remote_url, stream=True, timeout=30) if file_resp.status_code == 200: with open(local_path, 'wb') as f: shutil.copyfileobj(file_resp.raw, f) file_size = os.path.getsize(local_path) bale_url = None # اگر حجم زیر ۵۰ مگابایت بود و توکن تنظیم شده بود if file_size < 52428800 and BALE_BOT_TOKEN: bale_url = upload_to_bale(local_path) if bale_url: # حذف فایل موقت از دایرکتوری پس از آپلود موفق به بله برای بهینه‌سازی دیسک try: os.remove(local_path) except Exception: pass # استفاده از متد امن get_secure_host_url برای تضمین ساخت آدرس HTTPS proxied_url = f"{get_secure_host_url()}/api/proxy_bale?url={urllib.parse.quote(bale_url)}&job_id={job_id}&filename={urllib.parse.quote(filename)}" save_cached_bale_url(job_id, proxied_url) data["url"] = proxied_url else: # فایل‌های بالای ۵۰ مگابایت در مسیر موقت ذخیره شده و پس از ۳ روز خودکار حذف می‌شوند local_static_dir = "workspace/static_audio" final_local_path = os.path.join(local_static_dir, f"{job_id}.{ext}") shutil.move(local_path, final_local_path) local_served_url = f"{get_secure_host_url()}/static/audio/{job_id}.{ext}" save_cached_bale_url(job_id, local_served_url) data["url"] = local_served_url except Exception as e: logging.error(f"Error managing file download/upload for {job_id}: {e}") data["url"] = full_remote_url return jsonify(data) except Exception as e: logging.error(f"Error in proxy status API: {e}") return jsonify({"status": "processing", "message": "خطا در بررسی از سرور پردازش موازی"}), 200 @app.route('/api/check-credit', methods=['POST']) def check_credit(): data = request.get_json() if not data: return jsonify({"error": "Invalid request"}), 400 user_id = get_user_identifier(data) model_id = data.get('model_id', 'custom') with cache_lock: now = time.time() one_day = 24 * 60 * 60 user_record = next((u for u in usage_data_cache if u.get('id') == user_id), None) credits_remaining = USAGE_LIMIT_PER_MODEL limit_reached = False reset_timestamp = 0 if user_record: if user_record.get('day_start', 0) < (now - one_day): user_record['usage'] = {} user_record['day_start'] = now usage_dict = user_record.get('usage', {}) current_model_usage = usage_dict.get(model_id, 0) credits_remaining = max(0, USAGE_LIMIT_PER_MODEL - current_model_usage) if credits_remaining == 0: limit_reached = True reset_timestamp = user_record.get('day_start', now) + one_day return jsonify({ "credits_remaining": credits_remaining, "limit_reached": limit_reached, "reset_timestamp": reset_timestamp, "model_id": model_id }) @app.route('/api/use-credit', methods=['POST']) def use_credit(): data = request.get_json() if not data: return jsonify({"error": "Invalid request"}), 400 user_id = get_user_identifier(data) model_id = data.get('model_id', 'custom') with cache_lock: now = time.time() one_day = 24 * 60 * 60 user_record = next((u for u in usage_data_cache if u.get('id') == user_id), None) if user_record: if user_record.get('day_start', 0) < (now - one_day): user_record['usage'] = {} user_record['day_start'] = now if 'usage' not in user_record: user_record['usage'] = {} current_model_usage = user_record['usage'].get(model_id, 0) if current_model_usage >= USAGE_LIMIT_PER_MODEL: reset_timestamp = user_record.get('day_start', now) + one_day return jsonify({ "status": "limit_reached", "credits_remaining": 0, "reset_timestamp": reset_timestamp }), 429 user_record['usage'][model_id] = current_model_usage + 1 else: user_record = { "id": user_id, "day_start": now, "usage": {model_id: 1} } usage_data_cache.append(user_record) credits_remaining = USAGE_LIMIT_PER_MODEL - user_record['usage'][model_id] # در این بخش دیگر تریگر ذخیره در هاب گیت‌هاب زده نمی‌شود و فقط درون RAM است return jsonify({"status": "success", "credits_remaining": credits_remaining}) # --- Main --- if __name__ != '__main__': load_initial_data() t = threading.Thread(target=background_persister, daemon=True) t.start() t_cleanup = threading.Thread(target=periodic_cleanup, daemon=True) t_cleanup.start() if __name__ == '__main__': app.run(host='0.0.0.0', port=7860)