taghirsado / app.py
Opera10's picture
Update app.py
1386a8f verified
Raw
History Blame Contribute Delete
19.6 kB
import os
import json
import time
import threading
import logging
import shutil
import urllib.parse
from flask import Flask, request, jsonify, send_from_directory, Response, send_file
from flask_cors import CORS
from huggingface_hub import HfApi, hf_hub_download
import requests
from huggingface_hub.utils import RepositoryNotFoundError, EntryNotFoundError
# --- تنظیمات دیتابیس ابری ---
DATASET_REPO = "ezmarynoori/Karbaran-rayegan-tedad"
DATASET_FILENAME = "voice_conversion_usage_data.json"
BALE_CACHE_FILENAME = "voice_bale_cache.json"
USAGE_LIMIT_PER_MODEL = 1 # محدودیت تعداد در روز برای نسخه رایگان
HF_TOKEN = os.environ.get("HF_TOKEN")
BALE_BOT_TOKEN = os.environ.get("BALE_BOT_TOKEN", "").strip()
# پوشه بیلد شده توسط Docker
STATIC_FOLDER = 'dist'
# --- راه‌اندازی برنامه و پوشه‌های کاربری ---
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
app = Flask(__name__, static_folder=STATIC_FOLDER, static_url_path='')
CORS(app)
os.makedirs("workspace/temp_downloads", exist_ok=True)
os.makedirs("workspace/static_audio", exist_ok=True)
# --- متغیرهای کش و کنترل داده ---
usage_data_cache = []
bale_cache_data = {}
cache_lock = threading.Lock()
data_changed = threading.Event()
api = None
if not HF_TOKEN:
logging.error("هشدار: توکن HF_TOKEN یافت نشد. دیتابیس کار نخواهد کرد.")
else:
api = HfApi(token=HF_TOKEN)
# --- لود اولیه داده‌ها ---
def load_initial_data():
global usage_data_cache, bale_cache_data
with cache_lock:
# اطلاعات مصرف کاربران دیگر از سرور خوانده نمی‌شود و به صورت محلی در RAM شروع به کار می‌کند
usage_data_cache = []
if not api: return
# بارگیری اطلاعات مربوط به کش دائمی بله
try:
local_cache_path = hf_hub_download(
repo_id=DATASET_REPO,
filename=BALE_CACHE_FILENAME,
repo_type="dataset",
token=HF_TOKEN,
force_download=True
)
with open(local_cache_path, 'r', encoding='utf-8') as f:
content = f.read()
if content:
bale_cache_data = json.loads(content)
except (RepositoryNotFoundError, EntryNotFoundError):
bale_cache_data = {}
except Exception as e:
logging.warning(f"Error loading bale cache: {e}")
bale_cache_data = {}
# --- ذخیره‌سازی در پس‌زمینه ---
def persist_data_to_hub():
if not api: return
bale_snapshot = None
with cache_lock:
if data_changed.is_set():
# اطلاعات مصرف کاربران دیگر در گیت‌هاب ذخیره نمی‌شود
bale_snapshot = dict(bale_cache_data)
data_changed.clear()
if bale_snapshot is not None:
temp_bale_filepath = "/tmp/temp_bale_cache.json"
try:
with open(temp_bale_filepath, 'w', encoding='utf-8') as f:
json.dump(bale_snapshot, f, ensure_ascii=False, indent=2)
api.upload_file(
path_or_fileobj=temp_bale_filepath,
path_in_repo=BALE_CACHE_FILENAME,
repo_id=DATASET_REPO,
repo_type="dataset",
commit_message="Update bale cache"
)
logging.info("Bale cache data saved to HF Hub.")
except Exception as e:
logging.error(f"Failed to save bale cache data: {e}")
data_changed.set()
def background_persister():
while True:
time.sleep(300) # هر 5 دقیقه بررسی برای ذخیره‌سازی
persist_data_to_hub()
def get_user_identifier(data):
fingerprint = data.get('fingerprint')
if fingerprint: return str(fingerprint)
if request.headers.getlist("X-Forwarded-For"):
return request.headers.getlist("X-Forwarded-For")[0].split(',')[0].strip()
return request.remote_addr
# --- تابع کمکی ارائه‌دهنده پروتکل امن HTTPS ---
def get_secure_host_url():
"""تبدیل خودکار آدرس هاست ناامن به https جهت برطرف کردن خطای لود مدیا در مرورگر"""
host_url = request.host_url.rstrip('/')
if host_url.startswith("http://") and not any(x in host_url for x in ["localhost", "127.0.0.1", "0.0.0.0"]):
host_url = "https://" + host_url[7:]
return host_url
# --- مدیریت کش بله در حافظه ---
def get_cached_bale_url(job_id):
with cache_lock:
return bale_cache_data.get(job_id)
def save_cached_bale_url(job_id, url):
with cache_lock:
bale_cache_data[job_id] = url
data_changed.set()
# --- متد اصلی ارتباط با پیام‌رسان بله ---
def upload_to_bale(file_path):
"""آپلود فایل صوتی در بله و دریافت مستقیم لینک دانلود دائمی"""
if not BALE_BOT_TOKEN:
logging.warning("Bale Token in settings is not configured.")
return None
try:
bale_chat_id = BALE_BOT_TOKEN.split(":")[0]
except Exception as e:
logging.error(f"Error parsing Bale Token chat_id: {e}")
return None
bale_url = f"https://tapi.bale.ai/bot{BALE_BOT_TOKEN}/sendDocument"
for attempt in range(3):
try:
with open(file_path, 'rb') as f:
files = {'document': (os.path.basename(file_path), f)}
data = {'chat_id': bale_chat_id}
resp = requests.post(bale_url, data=data, files=files, timeout=150)
if resp.status_code == 200:
bale_data = resp.json()
if bale_data.get("ok"):
result = bale_data.get("result", {})
doc = result.get("document", {})
file_id = doc.get("file_id")
if not file_id:
file_id = result.get("video", {}).get("file_id") or result.get("audio", {}).get("file_id")
if file_id:
return f"https://tapi.bale.ai/file/bot{BALE_BOT_TOKEN}/{file_id}"
else:
logging.warning(f"Bale HTTP server status: {resp.status_code}")
except Exception as e:
logging.error(f"Error uploading to Bale (attempt {attempt+1}): {e}")
time.sleep(2)
return None
# --- اسکنر دوره‌ای پاکسازی فضای هارددیسک ---
def periodic_cleanup():
"""حذف فایل‌های صوتی باقی‌مانده بالای ۵۰ مگابایت پس از ۳ روز"""
while True:
try:
now = time.time()
three_days_ago = now - (3 * 24 * 60 * 60)
# تمیز کردن موقت‌ها
temp_dir = "workspace/temp_downloads"
if os.path.exists(temp_dir):
for filename in os.listdir(temp_dir):
file_path = os.path.join(temp_dir, filename)
if os.path.isfile(file_path) and os.path.getmtime(file_path) < three_days_ago:
try:
os.remove(file_path)
except Exception:
pass
# تمیز کردن پوشه استاتیک بالای ۵۰ مگابایت
static_dir = "workspace/static_audio"
if os.path.exists(static_dir):
for filename in os.listdir(static_dir):
file_path = os.path.join(static_dir, filename)
if os.path.isfile(file_path) and os.path.getmtime(file_path) < three_days_ago:
try:
os.remove(file_path)
except Exception:
pass
except Exception as e:
logging.error(f"Cleanup Worker Error: {e}")
time.sleep(3600)
# --- API Endpoints ---
@app.route('/')
def index():
return send_from_directory(STATIC_FOLDER, 'index.html')
# مسیر فایل‌های صوتی با پشتیبانی از ریدایرکت خودکار به بله در صورت حذف فیزیکی
@app.route('/static/audio/<filename>')
def serve_static_audio(filename):
local_path = os.path.join("workspace/static_audio", filename)
# ۱. اگر فایل به صورت فیزیکی روی دیسک موجود بود، آن را ارسال کن
if os.path.exists(local_path):
return send_from_directory("workspace/static_audio", filename)
# ۲. اگر فایل حذف شده بود، بررسی کن که آیا کش بله برای آن وجود دارد یا خیر
# استخراج تمیز شناسه پروژه (job_id) از نام فایل
job_id = filename.split('.')[0].split('_')[0]
cached_url = get_cached_bale_url(job_id)
if cached_url:
from flask import redirect
# هدایت مرورگر به آدرس استریم پراکسی‌شده بله به صورت کاملاً خودکار
return redirect(cached_url)
return "فایل صوتی مورد نظر یافت نشد.", 404
@app.route('/<path:path>')
def serve_static(path):
if os.path.exists(os.path.join(STATIC_FOLDER, path)):
return send_from_directory(STATIC_FOLDER, path)
return send_from_directory(STATIC_FOLDER, 'index.html')
@app.route('/api/proxy_bale')
def proxy_bale():
"""پراکسی هوشمند با استریم محلی استاندارد برای باز کردن گره پخش پلیرهای موبایل"""
url = request.args.get('url')
job_id = request.args.get('job_id', 'unknown')
filename = request.args.get('filename', 'audio.wav')
if not url or not url.startswith('https://tapi.bale.ai/'):
return "آدرس نامعتبر است.", 400
local_dir = "workspace/temp_downloads"
os.makedirs(local_dir, exist_ok=True)
# استفاده از نام فایل ایمن و همگام با استانداردهای کاراکتری سیستم‌عامل
ext = filename.split('.')[-1].lower() if '.' in filename else 'wav'
local_path = os.path.join(local_dir, f"{job_id}.{ext}")
# ۱. اگر فایل به صورت موقت کش نشده است، آن را از بله به صورت پرسرعت بارگیری کن
if not os.path.exists(local_path):
try:
resp = requests.get(url, timeout=30)
if resp.status_code == 200:
with open(local_path, 'wb') as f:
f.write(resp.content)
else:
return "خطا در واکشی فایل از بله", 404
except Exception as e:
return f"خطا در ارتباط با سرور: {str(e)}", 500
# ۲. تعیین نوع صوتی صریح MIME-Type برای حل باگ بلاک شدن مرورگرهای موبایل
mimetype = "audio/wav"
if ext == "mp3":
mimetype = "audio/mpeg"
elif ext == "ogg":
mimetype = "audio/ogg"
elif ext == "aac":
mimetype = "audio/aac"
# ۳. ارسال فایل با استفاده از سیستم پیشرفته Range-Request فلاسک برای هماهنگی با پلیرها
try:
response = send_file(local_path, mimetype=mimetype, conditional=True)
response.headers['Access-Control-Allow-Origin'] = '*'
response.headers['Access-Control-Allow-Headers'] = 'Range, Content-Type'
return response
except Exception as e:
return str(e), 500
@app.route('/api/status/<job_id>', methods=['GET'])
def proxy_status(job_id):
"""رهگیری وضعیت رندر و بازسازی خروجی به لینک‌های دائمی بله یا موقت داخلی با پشتیبانی از HTTPS"""
cached_url = get_cached_bale_url(job_id)
if cached_url:
return jsonify({
"status": "ready",
"url": cached_url,
"message": "تکمیل شد (آرشیو دائمی)"
})
url = f"https://opera8-action.hf.space/api/status/{job_id}"
try:
resp = requests.get(url, timeout=10)
if resp.status_code != 200:
return jsonify({"status": "processing", "message": "ارتباط با سرور برقرار نشد"}), 200
data = resp.json()
if data.get("status") == "ready":
remote_relative_url = data.get("url")
if remote_relative_url:
if remote_relative_url.startswith("http"):
full_remote_url = remote_relative_url
else:
full_remote_url = f"https://opera8-action.hf.space{remote_relative_url}"
# دانلود و اندازه‌گیری حجم فایل صوتی
local_dir = "workspace/temp_downloads"
filename = remote_relative_url.split("/")[-1]
ext = filename.split('.')[-1].lower() if '.' in filename else 'wav'
local_path = os.path.join(local_dir, f"{job_id}.{ext}")
try:
file_resp = requests.get(full_remote_url, stream=True, timeout=30)
if file_resp.status_code == 200:
with open(local_path, 'wb') as f:
shutil.copyfileobj(file_resp.raw, f)
file_size = os.path.getsize(local_path)
bale_url = None
# اگر حجم زیر ۵۰ مگابایت بود و توکن تنظیم شده بود
if file_size < 52428800 and BALE_BOT_TOKEN:
bale_url = upload_to_bale(local_path)
if bale_url:
# حذف فایل موقت از دایرکتوری پس از آپلود موفق به بله برای بهینه‌سازی دیسک
try:
os.remove(local_path)
except Exception:
pass
# استفاده از متد امن get_secure_host_url برای تضمین ساخت آدرس HTTPS
proxied_url = f"{get_secure_host_url()}/api/proxy_bale?url={urllib.parse.quote(bale_url)}&job_id={job_id}&filename={urllib.parse.quote(filename)}"
save_cached_bale_url(job_id, proxied_url)
data["url"] = proxied_url
else:
# فایل‌های بالای ۵۰ مگابایت در مسیر موقت ذخیره شده و پس از ۳ روز خودکار حذف می‌شوند
local_static_dir = "workspace/static_audio"
final_local_path = os.path.join(local_static_dir, f"{job_id}.{ext}")
shutil.move(local_path, final_local_path)
local_served_url = f"{get_secure_host_url()}/static/audio/{job_id}.{ext}"
save_cached_bale_url(job_id, local_served_url)
data["url"] = local_served_url
except Exception as e:
logging.error(f"Error managing file download/upload for {job_id}: {e}")
data["url"] = full_remote_url
return jsonify(data)
except Exception as e:
logging.error(f"Error in proxy status API: {e}")
return jsonify({"status": "processing", "message": "خطا در بررسی از سرور پردازش موازی"}), 200
@app.route('/api/check-credit', methods=['POST'])
def check_credit():
data = request.get_json()
if not data: return jsonify({"error": "Invalid request"}), 400
user_id = get_user_identifier(data)
model_id = data.get('model_id', 'custom')
with cache_lock:
now = time.time()
one_day = 24 * 60 * 60
user_record = next((u for u in usage_data_cache if u.get('id') == user_id), None)
credits_remaining = USAGE_LIMIT_PER_MODEL
limit_reached = False
reset_timestamp = 0
if user_record:
if user_record.get('day_start', 0) < (now - one_day):
user_record['usage'] = {}
user_record['day_start'] = now
usage_dict = user_record.get('usage', {})
current_model_usage = usage_dict.get(model_id, 0)
credits_remaining = max(0, USAGE_LIMIT_PER_MODEL - current_model_usage)
if credits_remaining == 0:
limit_reached = True
reset_timestamp = user_record.get('day_start', now) + one_day
return jsonify({
"credits_remaining": credits_remaining,
"limit_reached": limit_reached,
"reset_timestamp": reset_timestamp,
"model_id": model_id
})
@app.route('/api/use-credit', methods=['POST'])
def use_credit():
data = request.get_json()
if not data: return jsonify({"error": "Invalid request"}), 400
user_id = get_user_identifier(data)
model_id = data.get('model_id', 'custom')
with cache_lock:
now = time.time()
one_day = 24 * 60 * 60
user_record = next((u for u in usage_data_cache if u.get('id') == user_id), None)
if user_record:
if user_record.get('day_start', 0) < (now - one_day):
user_record['usage'] = {}
user_record['day_start'] = now
if 'usage' not in user_record:
user_record['usage'] = {}
current_model_usage = user_record['usage'].get(model_id, 0)
if current_model_usage >= USAGE_LIMIT_PER_MODEL:
reset_timestamp = user_record.get('day_start', now) + one_day
return jsonify({
"status": "limit_reached",
"credits_remaining": 0,
"reset_timestamp": reset_timestamp
}), 429
user_record['usage'][model_id] = current_model_usage + 1
else:
user_record = {
"id": user_id,
"day_start": now,
"usage": {model_id: 1}
}
usage_data_cache.append(user_record)
credits_remaining = USAGE_LIMIT_PER_MODEL - user_record['usage'][model_id]
# در این بخش دیگر تریگر ذخیره در هاب گیت‌هاب زده نمی‌شود و فقط درون RAM است
return jsonify({"status": "success", "credits_remaining": credits_remaining})
# --- Main ---
if __name__ != '__main__':
load_initial_data()
t = threading.Thread(target=background_persister, daemon=True)
t.start()
t_cleanup = threading.Thread(target=periodic_cleanup, daemon=True)
t_cleanup.start()
if __name__ == '__main__':
app.run(host='0.0.0.0', port=7860)