Ttsp2 / app.py
Hamed744's picture
Rename app (1).py to app.py
1c68ff4 verified
import os
import uuid
import requests
import threading
import time
from datetime import date
from flask import Flask, request, jsonify, render_template, Response
from itertools import cycle
import io
from pydub import AudioSegment
# --- 1. تنظیمات اصلی ---
WORKER_URLS = [
"https://hamed744-ttspro.hf.space/generate",
"https://hamed744-ttspro2.hf.space/generate",
"https://hamed744-ttspro3.hf.space/generate",
"https://hamed744-ttspro4.hf.space/generate",
"https://hamed744-ttspro5.hf.space/generate",
]
worker_pool = cycle(WORKER_URLS)
MAX_CHUNK_LENGTH = 2500
def get_next_worker_url():
return next(worker_pool)
# --- تنظیمات سیستم اعتبار ---
USAGE_LIMIT_TTS = 5
usage_data_cache = {}
app = Flask(__name__, template_folder='.', static_folder='.')
jobs = {}
lock = threading.Lock()
# ThreadPoolExecutor حذف شد چون Gunicorn خودش مدیریت می‌کند
# --- 2. توابع تقسیم و ادغام (بدون تغییر) ---
def split_text_into_chunks(text):
text = text.strip()
if len(text) <= MAX_CHUNK_LENGTH: return [text]
chunks, remaining_text = [], text
while len(remaining_text) > 0:
if len(remaining_text) <= MAX_CHUNK_LENGTH: chunks.append(remaining_text); break
chunk_candidate = remaining_text[:MAX_CHUNK_LENGTH]
split_index = -1
delimiters = ['\n', '.', '؟', '!', '؛', '،', ' ']
for delimiter in delimiters:
last_index = chunk_candidate.rfind(delimiter)
if last_index != -1: split_index = last_index + 1; break
if split_index == -1: split_index = MAX_CHUNK_LENGTH
chunks.append(remaining_text[:split_index].strip())
remaining_text = remaining_text[split_index:].strip()
return [chunk for chunk in chunks if chunk]
def merge_audio_segments(audio_segments):
if not audio_segments: return None
combined = AudioSegment.empty()
for segment in audio_segments: combined += segment
output_buffer = io.BytesIO()
combined.export(output_buffer, format="wav")
output_buffer.seek(0)
return output_buffer, "audio/wav"
# --- 3. توابع پس‌زمینه ---
def get_user_ip():
if request.headers.getlist("X-Forwarded-For"): return request.headers.getlist("X-Forwarded-For")[0].split(',')[0].strip()
return request.remote_addr
def call_worker(index, chunk_payload):
MAX_RETRIES = 3
for attempt in range(MAX_RETRIES):
worker_url = get_next_worker_url()
try:
print(f"قطعه {index}, تلاش {attempt + 1}/{MAX_RETRIES} به کارگر: {worker_url}")
response = requests.post(worker_url, json=chunk_payload, timeout=900)
response.raise_for_status()
content_type = response.headers.get('Content-Type', '')
if 'audio' not in content_type: raise Exception("پاسخ صوتی از کارگر دریافت نشد")
audio_data = io.BytesIO(response.content)
return index, AudioSegment.from_file(audio_data)
except Exception as e:
print(f"خطا در قطعه {index}, تلاش {attempt + 1} ناموفق بود: {e}")
if attempt < MAX_RETRIES - 1:
time.sleep(2)
return index, None
def process_multipart_job(internal_job_id, payload):
try:
text_chunks = split_text_into_chunks(payload.get("text", ""))
num_chunks = len(text_chunks)
print(f"Job {internal_job_id} به {num_chunks} قطعه تقسیم شد.")
chunk_payloads = []
for i, chunk in enumerate(text_chunks):
p = payload.copy()
p.pop('fingerprint', None); p.pop('subscriptionStatus', None); p['text'] = chunk
chunk_payloads.append((i, p))
with lock: jobs[internal_job_id]["status"] = f"در حال پردازش ۱ از {num_chunks}..."
# --- START: تغییر اصلی - استفاده از کتابخانه threading به جای ThreadPoolExecutor ---
# این روش با Gunicorn gthread سازگارتر است
results_map = {}
threads = []
def worker_task(index, payload):
idx, segment = call_worker(index, payload)
with lock:
results_map[idx] = segment
processed_count = len(results_map)
jobs[internal_job_id]["status"] = f"در حال پردازش {processed_count} از {num_chunks}..."
for i, p in chunk_payloads:
thread = threading.Thread(target=worker_task, args=(i, p))
threads.append(thread)
thread.start()
for thread in threads:
thread.join()
# --- END: تغییر اصلی ---
sorted_segments = [results_map.get(i) for i in range(num_chunks)]
successful_segments = [seg for seg in sorted_segments if seg is not None]
if len(successful_segments) != num_chunks:
raise Exception(f"خطا: {num_chunks - len(successful_segments)} قطعه با موفقیت پردازش نشد.")
with lock: jobs[internal_job_id]["status"] = "در حال ادغام فایل‌های صوتی..."
final_audio_buffer, content_type = merge_audio_segments(successful_segments)
with lock:
job = jobs[internal_job_id]
job["status"] = "completed"
job["result_data"] = final_audio_buffer
job["content_type"] = content_type
print(f"Job {internal_job_id} با موفقیت کامل شد.")
except Exception as e:
print(f"Job {internal_job_id} با خطا مواجه شد: {e}")
with lock:
job = jobs.get(internal_job_id)
if job:
job["status"] = "error"
job["result_data"] = str(e)
# --- 4. API Endpoints ---
@app.route('/')
def index(): return render_template('index.html')
@app.route('/api/check-credit-tts', methods=['POST'])
def check_credit_tts():
try:
data = request.get_json()
fingerprint, subscription_status = data.get('fingerprint'), data.get('subscriptionStatus')
if not fingerprint: return jsonify({"message": "Fingerprint is required."}), 400
if subscription_status == 'paid': return jsonify({"credits_remaining": "unlimited", "limit_reached": False})
today_str = date.today().isoformat()
user_record = usage_data_cache.get(fingerprint)
if not user_record or user_record.get("last_reset") != today_str:
user_record = {"count": 0, "last_reset": today_str}
usage_data_cache[fingerprint] = user_record
credits_remaining = max(0, USAGE_LIMIT_TTS - user_record["count"])
return jsonify({"credits_remaining": credits_remaining, "limit_reached": credits_remaining <= 0})
except Exception as e:
print(f"CRITICAL ERROR in check-credit-tts: {e}")
return jsonify({"message": "Internal Server Error"}), 500
@app.route('/api/generate', methods=['POST'])
def submit_job():
payload = request.get_json()
if payload.get('subscriptionStatus') != 'paid':
today_str, fingerprint = date.today().isoformat(), payload.get('fingerprint')
user_record = usage_data_cache.get(fingerprint)
if not user_record or user_record.get("last_reset") != today_str:
user_record = {"count": 0, "last_reset": today_str}
usage_data_cache[fingerprint] = user_record
if user_record["count"] >= USAGE_LIMIT_TTS:
return jsonify({"message": "شما به محدودیت روزانه خود برای تولید صدا رسیده‌اید..."}), 429
user_record["count"] += 1
internal_job_id = str(uuid.uuid4())
with lock:
# تغییر متن "در صف..." به "در حال آماده‌سازی..."
jobs[internal_job_id] = {"status": "در حال آماده‌سازی...", "result_data": None, "text": payload.get("text", "")}
# --- تغییر اصلی: اجرای مستقیم تابع در یک نخ جدید ---
# این کار به Gunicorn اجازه می‌دهد بلافاصله به کاربر پاسخ دهد
thread = threading.Thread(target=process_multipart_job, args=(internal_job_id, payload))
thread.start()
return jsonify({"job_id": internal_job_id})
@app.route('/api/check_status', methods=['POST'])
def check_status():
data = request.get_json()
job_id = data.get('job_id')
with lock: job = jobs.get(job_id)
if not job: return jsonify({"status": "not_found"})
response_data = {"status": job["status"]}
if job["status"] == "completed": response_data["proxy_url"] = f"/proxy/{job_id}"
elif job["status"] == "error": response_data["result"] = job.get("result_data", "خطای نامشخص")
return jsonify(response_data)
@app.route('/proxy/<job_id>')
def audio_proxy(job_id):
with lock: job = jobs.get(job_id)
if not job or job['status'] != 'completed' or not job.get('result_data'):
return "یافت نشد", 404
job['result_data'].seek(0)
return Response(job['result_data'].read(), content_type=job.get('content_type', 'audio/wav'))
if __name__ == '__main__':
port = int(os.environ.get('PORT', 7860))
app.run(host='0.0.0.0', port=port)