Ttsp3 / app.py
Hamed744's picture
Update app.py
49433f0 verified
import os
import uuid
import requests
import threading
import time
from datetime import date
from flask import Flask, request, jsonify, render_template, Response, send_from_directory
from itertools import cycle
import io
from pydub import AudioSegment
# --- 1. تنظیمات اصلی ---
# لیست ورکرها (سرورهای تولید صدا)
WORKER_URLS = [
"https://hamed744-ttspro6.hf.space/generate",
"https://hamed744-ttspro7.hf.space/generate",
"https://hamed744-ttspro8.hf.space/generate",
"https://hamed744-ttspro9.hf.space/generate",
]
worker_pool = cycle(WORKER_URLS)
MAX_CHUNK_LENGTH = 2500 # (این متغیر دیگر استفاده نمی‌شود چون تقسیم‌بندی حذف شد)
LIVE_MODEL_CHAR_LIMIT = 500 # مرز تشخیص متن کوتاه (برای لایف)
def get_next_worker_url():
return next(worker_pool)
# --- تنظیمات محدودیت استفاده ---
USAGE_LIMIT_GENERATE = 10 # محدودیت ۱۰ تولید در روز برای کاربران رایگان
usage_data_cache = {}
# تنظیم پوشه قالب‌ها و فایل‌های استاتیک به پوشه جاری
app = Flask(__name__, template_folder='.', static_folder='.')
jobs = {}
lock = threading.Lock()
# --- 2. توابع پردازش متن و صدا ---
def split_text_into_chunks(text):
# این تابع دیگر فراخوانی نمی‌شود اما جهت احتیاط در کد باقی ماند
text = text.strip()
if len(text) <= MAX_CHUNK_LENGTH: return [text]
chunks, remaining_text = [], text
while len(remaining_text) > 0:
if len(remaining_text) <= MAX_CHUNK_LENGTH: chunks.append(remaining_text); break
chunk_candidate = remaining_text[:MAX_CHUNK_LENGTH]
split_index = -1
delimiters = ['\n', '.', '؟', '!', '؛', '،', ' ']
for delimiter in delimiters:
last_index = chunk_candidate.rfind(delimiter)
if last_index != -1: split_index = last_index + 1; break
if split_index == -1: split_index = MAX_CHUNK_LENGTH
chunks.append(remaining_text[:split_index].strip())
remaining_text = remaining_text[split_index:].strip()
return [chunk for chunk in chunks if chunk]
def merge_audio_segments(audio_segments):
if not audio_segments: return None
combined = AudioSegment.empty()
for segment in audio_segments: combined += segment
output_buffer = io.BytesIO()
combined.export(output_buffer, format="wav")
output_buffer.seek(0)
return output_buffer, "audio/wav"
# --- 3. توابع ارتباط با ورکر ---
def call_worker(index, chunk_payload):
MAX_RETRIES = 3
for attempt in range(MAX_RETRIES):
worker_url = get_next_worker_url()
try:
# تایم‌اوت را افزایش دادیم چون متن طولانی یکجا ارسال می‌شود و پردازش زمان‌بر است
response = requests.post(worker_url, json=chunk_payload, timeout=1200)
response.raise_for_status()
content_type = response.headers.get('Content-Type', '')
if 'audio' not in content_type: raise Exception("پاسخ دریافتی حاوی صدا نیست")
audio_data = io.BytesIO(response.content)
return index, AudioSegment.from_file(audio_data)
except Exception as e:
print(f"Error processing chunk {index} (Attempt {attempt+1}): {e}")
if attempt < MAX_RETRIES - 1:
time.sleep(2)
return index, None
def process_multipart_job(internal_job_id, payload):
try:
text = payload.get("text", "").strip()
subscription_status = payload.get("subscriptionStatus", "free")
fingerprint = payload.get("fingerprint", "")
text_len = len(text)
# ============================================================
# منطق انتخاب مدل (همه کاربران طبق قانون ۵۰۰ کاراکتر)
# ============================================================
use_live_model = False
# اگر متن کمتر از 500 کاراکتر باشد -> مدل Live
# اگر متن بیشتر یا مساوی 500 کاراکتر باشد -> مدل Standard
if text_len < LIVE_MODEL_CHAR_LIMIT:
use_live_model = True
print(f"Job {internal_job_id}: User Type: {subscription_status} | Length: {text_len}. Strategy: Live Model (<500 chars).")
else:
use_live_model = False
print(f"Job {internal_job_id}: User Type: {subscription_status} | Length: {text_len}. Strategy: Standard Model (>500 chars).")
# ============================================================
# آماده‌سازی متن (تغییر مهم: حذف تقسیم‌بندی)
# کل متن به عنوان یک تکه (Chunk) در نظر گرفته می‌شود
text_chunks = [text]
num_chunks = len(text_chunks)
chunk_payloads = []
for i, chunk in enumerate(text_chunks):
p = payload.copy()
p.pop('fingerprint', None)
p.pop('subscriptionStatus', None)
p.pop('force_standard', None)
p['text'] = chunk
p['use_live_model'] = use_live_model
chunk_payloads.append((i, p))
with lock:
jobs[internal_job_id]["status"] = "در حال پردازش..."
results_map = {}
threads = []
def worker_task(index, payload):
idx, segment = call_worker(index, payload)
with lock:
results_map[idx] = segment
# چون فقط یک تکه داریم، نیازی به شمارش پیشرفته نیست
for i, p in chunk_payloads:
thread = threading.Thread(target=worker_task, args=(i, p))
threads.append(thread)
thread.start()
for thread in threads:
thread.join()
sorted_segments = [results_map.get(i) for i in range(num_chunks)]
successful_segments = [seg for seg in sorted_segments if seg is not None]
if len(successful_segments) != num_chunks:
raise Exception(f"خطا در پردازش متن.")
# مرحله ادغام (حتی اگر یک تکه باشد، برای تبدیل فرمت مفید است)
with lock: jobs[internal_job_id]["status"] = "در حال نهایی‌سازی..."
final_audio_buffer, content_type = merge_audio_segments(successful_segments)
with lock:
job = jobs[internal_job_id]
job["status"] = "completed"
job["result_data"] = final_audio_buffer
job["content_type"] = content_type
print(f"Job {internal_job_id} Completed.")
except Exception as e:
print(f"Job {internal_job_id} Error: {e}")
with lock:
job = jobs.get(internal_job_id)
if job:
job["status"] = "error"
job["result_data"] = str(e)
# --- 4. API Endpoints ---
@app.route('/')
def index():
try:
return render_template('index.html')
except Exception as e:
return f"TTS Manager Service is Running.<br>UI Not Found: {e}"
@app.route('/<path:path>')
def serve_static(path):
return send_from_directory('.', path)
@app.route('/api/check-credit-tts', methods=['POST'])
def check_credit_tts():
try:
data = request.get_json()
fingerprint = data.get('fingerprint')
subscription_status = data.get('subscriptionStatus')
if not fingerprint: return jsonify({"message": "Fingerprint is required."}), 400
if subscription_status == 'paid':
return jsonify({
"credits_remaining": "unlimited",
"limit_reached": False
})
today_str = date.today().isoformat()
user_record = usage_data_cache.get(fingerprint)
if not user_record or user_record.get("last_reset") != today_str:
user_record = {"count": 0, "last_reset": today_str}
usage_data_cache[fingerprint] = user_record
credits = max(0, USAGE_LIMIT_GENERATE - user_record["count"])
return jsonify({
"credits_remaining": credits,
"limit_reached": credits <= 0
})
except Exception as e:
return jsonify({"message": "Server Error"}), 500
@app.route('/api/generate', methods=['POST'])
def submit_job():
payload = request.get_json()
can_download = True
# اعمال محدودیت فقط اگر 'paid' نباشد
if payload.get('subscriptionStatus') != 'paid':
fingerprint = payload.get('fingerprint')
today_str = date.today().isoformat()
user_record = usage_data_cache.get(fingerprint)
if not user_record or user_record.get("last_reset") != today_str:
user_record = {"count": 0, "last_reset": today_str}
usage_data_cache[fingerprint] = user_record
if user_record["count"] >= USAGE_LIMIT_GENERATE:
return jsonify({"message": "سقف استفاده روزانه تکمیل شده است."}), 429
user_record["count"] += 1
internal_job_id = str(uuid.uuid4())
with lock:
jobs[internal_job_id] = {
"status": "در حال صف‌گذاری...",
"result_data": None,
"text": payload.get("text", "")
}
thread = threading.Thread(target=process_multipart_job, args=(internal_job_id, payload))
thread.start()
return jsonify({"job_id": internal_job_id, "can_download": can_download})
@app.route('/api/check_status', methods=['POST'])
def check_status():
data = request.get_json()
job_id = data.get('job_id')
with lock: job = jobs.get(job_id)
if not job: return jsonify({"status": "not_found"})
resp = {"status": job["status"]}
if job["status"] == "completed":
resp["proxy_url"] = f"/proxy/{job_id}"
elif job["status"] == "error":
resp["result"] = job.get("result_data", "خطای نامشخص")
return jsonify(resp)
@app.route('/proxy/<job_id>')
def audio_proxy(job_id):
with lock: job = jobs.get(job_id)
if not job or job['status'] != 'completed' or not job.get('result_data'):
return "Not Found", 404
job['result_data'].seek(0)
return Response(job['result_data'].read(), content_type=job.get('content_type', 'audio/wav'))
if __name__ == '__main__':
port = int(os.environ.get('PORT', 7860))
app.run(host='0.0.0.0', port=port)