import os import uuid import requests import threading import time from datetime import date from flask import Flask, request, jsonify, render_template, Response from itertools import cycle import io from pydub import AudioSegment # --- 1. تنظیمات اصلی --- WORKER_URLS = [ "https://hamed744-ttspro.hf.space/generate", "https://hamed744-ttspro2.hf.space/generate", "https://hamed744-ttspro3.hf.space/generate", "https://hamed744-ttspro4.hf.space/generate", "https://hamed744-ttspro5.hf.space/generate", ] worker_pool = cycle(WORKER_URLS) MAX_CHUNK_LENGTH = 2500 def get_next_worker_url(): return next(worker_pool) # --- تنظیمات سیستم اعتبار --- USAGE_LIMIT_TTS = 5 usage_data_cache = {} app = Flask(__name__, template_folder='.', static_folder='.') jobs = {} lock = threading.Lock() # ThreadPoolExecutor حذف شد چون Gunicorn خودش مدیریت می‌کند # --- 2. توابع تقسیم و ادغام (بدون تغییر) --- def split_text_into_chunks(text): text = text.strip() if len(text) <= MAX_CHUNK_LENGTH: return [text] chunks, remaining_text = [], text while len(remaining_text) > 0: if len(remaining_text) <= MAX_CHUNK_LENGTH: chunks.append(remaining_text); break chunk_candidate = remaining_text[:MAX_CHUNK_LENGTH] split_index = -1 delimiters = ['\n', '.', '؟', '!', '؛', '،', ' '] for delimiter in delimiters: last_index = chunk_candidate.rfind(delimiter) if last_index != -1: split_index = last_index + 1; break if split_index == -1: split_index = MAX_CHUNK_LENGTH chunks.append(remaining_text[:split_index].strip()) remaining_text = remaining_text[split_index:].strip() return [chunk for chunk in chunks if chunk] def merge_audio_segments(audio_segments): if not audio_segments: return None combined = AudioSegment.empty() for segment in audio_segments: combined += segment output_buffer = io.BytesIO() combined.export(output_buffer, format="wav") output_buffer.seek(0) return output_buffer, "audio/wav" # --- 3. توابع پس‌زمینه --- def get_user_ip(): if request.headers.getlist("X-Forwarded-For"): return request.headers.getlist("X-Forwarded-For")[0].split(',')[0].strip() return request.remote_addr def call_worker(index, chunk_payload): MAX_RETRIES = 3 for attempt in range(MAX_RETRIES): worker_url = get_next_worker_url() try: print(f"قطعه {index}, تلاش {attempt + 1}/{MAX_RETRIES} به کارگر: {worker_url}") response = requests.post(worker_url, json=chunk_payload, timeout=900) response.raise_for_status() content_type = response.headers.get('Content-Type', '') if 'audio' not in content_type: raise Exception("پاسخ صوتی از کارگر دریافت نشد") audio_data = io.BytesIO(response.content) return index, AudioSegment.from_file(audio_data) except Exception as e: print(f"خطا در قطعه {index}, تلاش {attempt + 1} ناموفق بود: {e}") if attempt < MAX_RETRIES - 1: time.sleep(2) return index, None def process_multipart_job(internal_job_id, payload): try: text_chunks = split_text_into_chunks(payload.get("text", "")) num_chunks = len(text_chunks) print(f"Job {internal_job_id} به {num_chunks} قطعه تقسیم شد.") chunk_payloads = [] for i, chunk in enumerate(text_chunks): p = payload.copy() p.pop('fingerprint', None); p.pop('subscriptionStatus', None); p['text'] = chunk chunk_payloads.append((i, p)) with lock: jobs[internal_job_id]["status"] = f"در حال پردازش ۱ از {num_chunks}..." # --- START: تغییر اصلی - استفاده از کتابخانه threading به جای ThreadPoolExecutor --- # این روش با Gunicorn gthread سازگارتر است results_map = {} threads = [] def worker_task(index, payload): idx, segment = call_worker(index, payload) with lock: results_map[idx] = segment processed_count = len(results_map) jobs[internal_job_id]["status"] = f"در حال پردازش {processed_count} از {num_chunks}..." for i, p in chunk_payloads: thread = threading.Thread(target=worker_task, args=(i, p)) threads.append(thread) thread.start() for thread in threads: thread.join() # --- END: تغییر اصلی --- sorted_segments = [results_map.get(i) for i in range(num_chunks)] successful_segments = [seg for seg in sorted_segments if seg is not None] if len(successful_segments) != num_chunks: raise Exception(f"خطا: {num_chunks - len(successful_segments)} قطعه با موفقیت پردازش نشد.") with lock: jobs[internal_job_id]["status"] = "در حال ادغام فایل‌های صوتی..." final_audio_buffer, content_type = merge_audio_segments(successful_segments) with lock: job = jobs[internal_job_id] job["status"] = "completed" job["result_data"] = final_audio_buffer job["content_type"] = content_type print(f"Job {internal_job_id} با موفقیت کامل شد.") except Exception as e: print(f"Job {internal_job_id} با خطا مواجه شد: {e}") with lock: job = jobs.get(internal_job_id) if job: job["status"] = "error" job["result_data"] = str(e) # --- 4. API Endpoints --- @app.route('/') def index(): return render_template('index.html') @app.route('/api/check-credit-tts', methods=['POST']) def check_credit_tts(): try: data = request.get_json() fingerprint, subscription_status = data.get('fingerprint'), data.get('subscriptionStatus') if not fingerprint: return jsonify({"message": "Fingerprint is required."}), 400 if subscription_status == 'paid': return jsonify({"credits_remaining": "unlimited", "limit_reached": False}) today_str = date.today().isoformat() user_record = usage_data_cache.get(fingerprint) if not user_record or user_record.get("last_reset") != today_str: user_record = {"count": 0, "last_reset": today_str} usage_data_cache[fingerprint] = user_record credits_remaining = max(0, USAGE_LIMIT_TTS - user_record["count"]) return jsonify({"credits_remaining": credits_remaining, "limit_reached": credits_remaining <= 0}) except Exception as e: print(f"CRITICAL ERROR in check-credit-tts: {e}") return jsonify({"message": "Internal Server Error"}), 500 @app.route('/api/generate', methods=['POST']) def submit_job(): payload = request.get_json() if payload.get('subscriptionStatus') != 'paid': today_str, fingerprint = date.today().isoformat(), payload.get('fingerprint') user_record = usage_data_cache.get(fingerprint) if not user_record or user_record.get("last_reset") != today_str: user_record = {"count": 0, "last_reset": today_str} usage_data_cache[fingerprint] = user_record if user_record["count"] >= USAGE_LIMIT_TTS: return jsonify({"message": "شما به محدودیت روزانه خود برای تولید صدا رسیده‌اید..."}), 429 user_record["count"] += 1 internal_job_id = str(uuid.uuid4()) with lock: # تغییر متن "در صف..." به "در حال آماده‌سازی..." jobs[internal_job_id] = {"status": "در حال آماده‌سازی...", "result_data": None, "text": payload.get("text", "")} # --- تغییر اصلی: اجرای مستقیم تابع در یک نخ جدید --- # این کار به Gunicorn اجازه می‌دهد بلافاصله به کاربر پاسخ دهد thread = threading.Thread(target=process_multipart_job, args=(internal_job_id, payload)) thread.start() return jsonify({"job_id": internal_job_id}) @app.route('/api/check_status', methods=['POST']) def check_status(): data = request.get_json() job_id = data.get('job_id') with lock: job = jobs.get(job_id) if not job: return jsonify({"status": "not_found"}) response_data = {"status": job["status"]} if job["status"] == "completed": response_data["proxy_url"] = f"/proxy/{job_id}" elif job["status"] == "error": response_data["result"] = job.get("result_data", "خطای نامشخص") return jsonify(response_data) @app.route('/proxy/') def audio_proxy(job_id): with lock: job = jobs.get(job_id) if not job or job['status'] != 'completed' or not job.get('result_data'): return "یافت نشد", 404 job['result_data'].seek(0) return Response(job['result_data'].read(), content_type=job.get('content_type', 'audio/wav')) if __name__ == '__main__': port = int(os.environ.get('PORT', 7860)) app.run(host='0.0.0.0', port=port)