import os import uuid import requests import threading import time from datetime import date from flask import Flask, request, jsonify, render_template, Response, send_from_directory from itertools import cycle import io from pydub import AudioSegment # --- 1. تنظیمات اصلی --- # لیست ورکرها (سرورهای تولید صدا) WORKER_URLS = [ "https://hamed744-ttspro6.hf.space/generate", "https://hamed744-ttspro7.hf.space/generate", "https://hamed744-ttspro8.hf.space/generate", "https://hamed744-ttspro9.hf.space/generate", ] worker_pool = cycle(WORKER_URLS) MAX_CHUNK_LENGTH = 2500 # (این متغیر دیگر استفاده نمی‌شود چون تقسیم‌بندی حذف شد) LIVE_MODEL_CHAR_LIMIT = 500 # مرز تشخیص متن کوتاه (برای لایف) def get_next_worker_url(): return next(worker_pool) # --- تنظیمات محدودیت استفاده --- USAGE_LIMIT_GENERATE = 10 # محدودیت ۱۰ تولید در روز برای کاربران رایگان usage_data_cache = {} # تنظیم پوشه قالب‌ها و فایل‌های استاتیک به پوشه جاری app = Flask(__name__, template_folder='.', static_folder='.') jobs = {} lock = threading.Lock() # --- 2. توابع پردازش متن و صدا --- def split_text_into_chunks(text): # این تابع دیگر فراخوانی نمی‌شود اما جهت احتیاط در کد باقی ماند text = text.strip() if len(text) <= MAX_CHUNK_LENGTH: return [text] chunks, remaining_text = [], text while len(remaining_text) > 0: if len(remaining_text) <= MAX_CHUNK_LENGTH: chunks.append(remaining_text); break chunk_candidate = remaining_text[:MAX_CHUNK_LENGTH] split_index = -1 delimiters = ['\n', '.', '؟', '!', '؛', '،', ' '] for delimiter in delimiters: last_index = chunk_candidate.rfind(delimiter) if last_index != -1: split_index = last_index + 1; break if split_index == -1: split_index = MAX_CHUNK_LENGTH chunks.append(remaining_text[:split_index].strip()) remaining_text = remaining_text[split_index:].strip() return [chunk for chunk in chunks if chunk] def merge_audio_segments(audio_segments): if not audio_segments: return None combined = AudioSegment.empty() for segment in audio_segments: combined += segment output_buffer = io.BytesIO() combined.export(output_buffer, format="wav") output_buffer.seek(0) return output_buffer, "audio/wav" # --- 3. توابع ارتباط با ورکر --- def call_worker(index, chunk_payload): MAX_RETRIES = 3 for attempt in range(MAX_RETRIES): worker_url = get_next_worker_url() try: # تایم‌اوت را افزایش دادیم چون متن طولانی یکجا ارسال می‌شود و پردازش زمان‌بر است response = requests.post(worker_url, json=chunk_payload, timeout=1200) response.raise_for_status() content_type = response.headers.get('Content-Type', '') if 'audio' not in content_type: raise Exception("پاسخ دریافتی حاوی صدا نیست") audio_data = io.BytesIO(response.content) return index, AudioSegment.from_file(audio_data) except Exception as e: print(f"Error processing chunk {index} (Attempt {attempt+1}): {e}") if attempt < MAX_RETRIES - 1: time.sleep(2) return index, None def process_multipart_job(internal_job_id, payload): try: text = payload.get("text", "").strip() subscription_status = payload.get("subscriptionStatus", "free") fingerprint = payload.get("fingerprint", "") text_len = len(text) # ============================================================ # منطق انتخاب مدل (همه کاربران طبق قانون ۵۰۰ کاراکتر) # ============================================================ use_live_model = False # اگر متن کمتر از 500 کاراکتر باشد -> مدل Live # اگر متن بیشتر یا مساوی 500 کاراکتر باشد -> مدل Standard if text_len < LIVE_MODEL_CHAR_LIMIT: use_live_model = True print(f"Job {internal_job_id}: User Type: {subscription_status} | Length: {text_len}. Strategy: Live Model (<500 chars).") else: use_live_model = False print(f"Job {internal_job_id}: User Type: {subscription_status} | Length: {text_len}. Strategy: Standard Model (>500 chars).") # ============================================================ # آماده‌سازی متن (تغییر مهم: حذف تقسیم‌بندی) # کل متن به عنوان یک تکه (Chunk) در نظر گرفته می‌شود text_chunks = [text] num_chunks = len(text_chunks) chunk_payloads = [] for i, chunk in enumerate(text_chunks): p = payload.copy() p.pop('fingerprint', None) p.pop('subscriptionStatus', None) p.pop('force_standard', None) p['text'] = chunk p['use_live_model'] = use_live_model chunk_payloads.append((i, p)) with lock: jobs[internal_job_id]["status"] = "در حال پردازش..." results_map = {} threads = [] def worker_task(index, payload): idx, segment = call_worker(index, payload) with lock: results_map[idx] = segment # چون فقط یک تکه داریم، نیازی به شمارش پیشرفته نیست for i, p in chunk_payloads: thread = threading.Thread(target=worker_task, args=(i, p)) threads.append(thread) thread.start() for thread in threads: thread.join() sorted_segments = [results_map.get(i) for i in range(num_chunks)] successful_segments = [seg for seg in sorted_segments if seg is not None] if len(successful_segments) != num_chunks: raise Exception(f"خطا در پردازش متن.") # مرحله ادغام (حتی اگر یک تکه باشد، برای تبدیل فرمت مفید است) with lock: jobs[internal_job_id]["status"] = "در حال نهایی‌سازی..." final_audio_buffer, content_type = merge_audio_segments(successful_segments) with lock: job = jobs[internal_job_id] job["status"] = "completed" job["result_data"] = final_audio_buffer job["content_type"] = content_type print(f"Job {internal_job_id} Completed.") except Exception as e: print(f"Job {internal_job_id} Error: {e}") with lock: job = jobs.get(internal_job_id) if job: job["status"] = "error" job["result_data"] = str(e) # --- 4. API Endpoints --- @app.route('/') def index(): try: return render_template('index.html') except Exception as e: return f"TTS Manager Service is Running.
UI Not Found: {e}" @app.route('/') def serve_static(path): return send_from_directory('.', path) @app.route('/api/check-credit-tts', methods=['POST']) def check_credit_tts(): try: data = request.get_json() fingerprint = data.get('fingerprint') subscription_status = data.get('subscriptionStatus') if not fingerprint: return jsonify({"message": "Fingerprint is required."}), 400 if subscription_status == 'paid': return jsonify({ "credits_remaining": "unlimited", "limit_reached": False }) today_str = date.today().isoformat() user_record = usage_data_cache.get(fingerprint) if not user_record or user_record.get("last_reset") != today_str: user_record = {"count": 0, "last_reset": today_str} usage_data_cache[fingerprint] = user_record credits = max(0, USAGE_LIMIT_GENERATE - user_record["count"]) return jsonify({ "credits_remaining": credits, "limit_reached": credits <= 0 }) except Exception as e: return jsonify({"message": "Server Error"}), 500 @app.route('/api/generate', methods=['POST']) def submit_job(): payload = request.get_json() can_download = True # اعمال محدودیت فقط اگر 'paid' نباشد if payload.get('subscriptionStatus') != 'paid': fingerprint = payload.get('fingerprint') today_str = date.today().isoformat() user_record = usage_data_cache.get(fingerprint) if not user_record or user_record.get("last_reset") != today_str: user_record = {"count": 0, "last_reset": today_str} usage_data_cache[fingerprint] = user_record if user_record["count"] >= USAGE_LIMIT_GENERATE: return jsonify({"message": "سقف استفاده روزانه تکمیل شده است."}), 429 user_record["count"] += 1 internal_job_id = str(uuid.uuid4()) with lock: jobs[internal_job_id] = { "status": "در حال صف‌گذاری...", "result_data": None, "text": payload.get("text", "") } thread = threading.Thread(target=process_multipart_job, args=(internal_job_id, payload)) thread.start() return jsonify({"job_id": internal_job_id, "can_download": can_download}) @app.route('/api/check_status', methods=['POST']) def check_status(): data = request.get_json() job_id = data.get('job_id') with lock: job = jobs.get(job_id) if not job: return jsonify({"status": "not_found"}) resp = {"status": job["status"]} if job["status"] == "completed": resp["proxy_url"] = f"/proxy/{job_id}" elif job["status"] == "error": resp["result"] = job.get("result_data", "خطای نامشخص") return jsonify(resp) @app.route('/proxy/') def audio_proxy(job_id): with lock: job = jobs.get(job_id) if not job or job['status'] != 'completed' or not job.get('result_data'): return "Not Found", 404 job['result_data'].seek(0) return Response(job['result_data'].read(), content_type=job.get('content_type', 'audio/wav')) if __name__ == '__main__': port = int(os.environ.get('PORT', 7860)) app.run(host='0.0.0.0', port=port)