Spaces:
Sleeping
Sleeping
| import os | |
| import json | |
| import uuid | |
| import time | |
| import asyncio | |
| import aiohttp | |
| import requests | |
| import subprocess | |
| import random | |
| import logging | |
| import base64 | |
| import io | |
| from flask import Flask, request, jsonify, send_file, render_template | |
| from flask_cors import CORS | |
| from werkzeug.utils import secure_filename | |
| import google.generativeai as genai | |
| from pydub import AudioSegment | |
| # ========================================== | |
| # تنظیمات سیستم لاگ (نمایش پیامها در کنسول) | |
| # ========================================== | |
| logging.basicConfig( | |
| level=logging.INFO, | |
| format='%(asctime)s - %(levelname)s - %(message)s', | |
| datefmt='%Y-%m-%d %H:%M:%S' | |
| ) | |
| logger = logging.getLogger(__name__) | |
| app = Flask(__name__, template_folder='templates', static_folder='static') | |
| CORS(app) | |
| # ========================================== | |
| # بررسی کلیدها در لحظه شروع برنامه | |
| # ========================================== | |
| def load_api_keys(): | |
| """خواندن و بررسی کلیدها از تنظیمات""" | |
| raw_keys = os.environ.get("GEMINI_API_KEYS", "") | |
| # تمیزکاری کلیدها (حذف فاصله و خط جدید) | |
| keys_list = [k.strip() for k in raw_keys.split(',') if k.strip()] | |
| if keys_list: | |
| logger.info(f"✅ تعداد {len(keys_list)} کلید API جیمینای با موفقیت شناسایی شد.") | |
| # نمایش ۴ حرف آخر اولین کلید برای اطمینان (بدون لو رفتن کلید) | |
| first_key_preview = keys_list[0][-4:] if len(keys_list[0]) > 4 else "****" | |
| logger.info(f"ℹ️ نمونه کلید اول: ...{first_key_preview}") | |
| else: | |
| logger.error("❌ هیچ کلید API یافت نشد!") | |
| logger.warning("⚠️ لطفاً مطمئن شوید در بخش Settings > Secrets متغیر GEMINI_API_KEYS را مقداردهی کردهاید.") | |
| return keys_list | |
| # بارگذاری اولیه برای نمایش در لاگ | |
| GLOBAL_KEYS = load_api_keys() | |
| # ========================================== | |
| # تنظیمات مسیرها و API ها | |
| # ========================================== | |
| UPLOAD_FOLDER = 'uploads' | |
| TEMP_AUDIO_FOLDER = 'temp_audio' | |
| os.makedirs(UPLOAD_FOLDER, exist_ok=True) | |
| os.makedirs(TEMP_AUDIO_FOLDER, exist_ok=True) | |
| # آدرس API پادکست (موتور تولید صدای پایه) | |
| PODCAST_API_URL = "https://ezmarynoori-podgen.hf.space/api/generate" | |
| # آدرس سرویس تغییر صدا (مدل اختصاصی) | |
| VC_SPACE_URL = "https://ezmarynoori-sada.hf.space" | |
| # لیست کامل ۳۰ گوینده جهت تشخیص و انتخاب توسط هوش مصنوعی | |
| CAST_PROMPT = """ | |
| AVAILABLE VOICE ACTORS (Use these IDs only for the "speaker_id" field): | |
| -- MALE VOICES (16 Options) --: | |
| 1. Charon | |
| 2. Achird | |
| 3. Zubenelgenubi | |
| 4. Rasalgethi | |
| 5. Sadachbia | |
| 6. Sadaltager | |
| 7. Alnilam | |
| 8. Schedar | |
| 9. Umbriel | |
| 10. Algieba | |
| 11. Algenib | |
| 12. Orus | |
| 13. Enceladus | |
| 14. Iapetus | |
| 15. Puck | |
| 16. Fenrir | |
| -- FEMALE VOICES (14 Options) --: | |
| 1. Zephyr | |
| 2. Vindemiatrix | |
| 3. Sulafat | |
| 4. Laomedeia | |
| 5. Achernar | |
| 6. Gacrux | |
| 7. Pulcherrima | |
| 8. Despina | |
| 9. Erinome | |
| 10. Aoede | |
| 11. Callirrhoe | |
| 12. Autonoe | |
| 13. Kore | |
| 14. Leda | |
| """ | |
| # ========================================== | |
| # توابع کمکی | |
| # ========================================== | |
| def get_video_duration(video_path): | |
| cmd = ['ffprobe', '-v', 'error', '-show_entries', 'format=duration', '-of', 'default=noprint_wrappers=1:nokey=1', video_path] | |
| return float(subprocess.check_output(cmd).decode('utf-8').strip()) | |
| def extract_audio(video_path): | |
| audio_path = video_path.rsplit('.', 1)[0] + '.mp3' | |
| subprocess.run(['ffmpeg', '-i', video_path, '-vn', '-acodec', 'mp3', '-y', audio_path], | |
| stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) | |
| return audio_path | |
| def generate_content_with_retry(prompt, audio_file_path): | |
| """مدیریت کلیدهای جمینای برای تحلیل ویدیو""" | |
| keys_list = load_api_keys() | |
| if not keys_list: | |
| raise Exception("هیچ کلید API در سیستم ثبت نشده است. لطفاً تنظیمات Secret را بررسی کنید.") | |
| random.shuffle(keys_list) | |
| last_exception = None | |
| for i, api_key in enumerate(keys_list): | |
| try: | |
| logger.info(f"🔄 تلاش با کلید شماره {i+1} (انتهای کلید: ...{api_key[-4:]})") | |
| genai.configure(api_key=api_key) | |
| model = genai.GenerativeModel('gemini-2.5-flash') | |
| logger.info("📤 در حال آپلود فایل صوتی به سرور گوگل...") | |
| uploaded_file = genai.upload_file(audio_file_path) | |
| while uploaded_file.state.name == "PROCESSING": | |
| time.sleep(1) | |
| uploaded_file = genai.get_file(uploaded_file.name) | |
| if uploaded_file.state.name == "FAILED": | |
| raise Exception("Google failed to process audio file.") | |
| logger.info("🤖 فایل پردازش شد. در حال ارسال درخواست به جمینای...") | |
| response = model.generate_content( | |
| [prompt, uploaded_file], | |
| generation_config={"response_mime_type": "application/json"} | |
| ) | |
| try: | |
| genai.delete_file(uploaded_file.name) | |
| except: | |
| pass | |
| logger.info("✅ پاسخ با موفقیت از جمینای دریافت شد.") | |
| return json.loads(response.text) | |
| except Exception as e: | |
| logger.error(f"❌ خطا با کلید شماره {i+1}: {e}") | |
| last_exception = e | |
| continue | |
| logger.critical("⛔ تمام کلیدهای API با خطا مواجه شدند.") | |
| raise Exception(f"تمام کلیدهای API ناموفق بودند. آخرین خطا: {str(last_exception)}") | |
| # ========================================== | |
| # تابع ارتباط با اسپیس تغییر صدا (مدل اختصاصی) | |
| # ========================================== | |
| def process_voice_conversion(tts_audio_io, ref_audio_base64): | |
| try: | |
| tts_audio_io.seek(0) | |
| # دیکد کردن Base64 صدای رفرنس کاربر | |
| if "," in ref_audio_base64: | |
| ref_audio_base64 = ref_audio_base64.split(",")[1] | |
| ref_bytes = base64.b64decode(ref_audio_base64) | |
| files = { | |
| 'source_audio': ('source.wav', tts_audio_io, 'audio/wav'), | |
| 'ref_audio': ('ref.wav', io.BytesIO(ref_bytes), 'audio/wav') | |
| } | |
| # 1. آپلود فایلها به سرویس VC | |
| logger.info(f"VC: Uploading to {VC_SPACE_URL}/upload") | |
| res = requests.post(f"{VC_SPACE_URL}/upload", files=files, timeout=120) | |
| if res.status_code != 200: | |
| raise Exception(f"VC Upload Failed: {res.text}") | |
| job_data = res.json() | |
| # 2. بررسی وضعیت پردازش (Polling تا ۸ دقیقه) | |
| for _ in range(120): | |
| time.sleep(4) | |
| chk = requests.post(f"{VC_SPACE_URL}/check_status", json=job_data, timeout=30) | |
| if chk.status_code == 200: | |
| stat = chk.json() | |
| if stat.get("status") == "completed": | |
| filename = stat.get("filename") | |
| # 3. دانلود فایل نهایی تغییر یافته | |
| dl = requests.get(f"{VC_SPACE_URL}/download/{filename}") | |
| if dl.status_code == 200: | |
| return io.BytesIO(dl.content) | |
| else: | |
| raise Exception("VC Download Failed") | |
| elif stat.get("status") == "failed": | |
| detail = stat.get("detail", "Unknown error") | |
| raise Exception(f"VC Remote Failed: {detail}") | |
| raise Exception("VC Timeout (Processing took too long)") | |
| except Exception as e: | |
| logger.error(f"VC Error: {e}") | |
| return None | |
| # ========================================== | |
| # تولید صدای اولیه (آسنکرون برای تحلیل اولیه) | |
| # ========================================== | |
| async def generate_audio_async(session, text, speaker, index): | |
| try: | |
| payload = {"text": text, "speaker": speaker, "temperature": 0.9} | |
| async with session.post(PODCAST_API_URL, json=payload, timeout=300) as resp: | |
| if resp.status == 200: | |
| audio_data = await resp.read() | |
| filename = f"seg_{uuid.uuid4()}.wav" | |
| path = os.path.join(TEMP_AUDIO_FOLDER, filename) | |
| with open(path, 'wb') as f: | |
| f.write(audio_data) | |
| return {"index": index, "status": "success", "file": filename} | |
| else: | |
| logger.error(f"Podcast API Error: {resp.status}") | |
| except Exception as e: | |
| logger.error(f"Error gen audio {index}: {e}") | |
| return {"index": index, "status": "failed"} | |
| async def batch_generate_audio(segments): | |
| sem = asyncio.Semaphore(20) | |
| async with aiohttp.ClientSession() as session: | |
| tasks = [] | |
| for i, seg in enumerate(segments): | |
| async def task_wrapper(s_idx, s_item): | |
| async with sem: | |
| return await generate_audio_async(session, s_item['text'], s_item['speaker_id'], s_idx) | |
| tasks.append(task_wrapper(i, seg)) | |
| return await asyncio.gather(*tasks) | |
| # ========================================== | |
| # مسیرهای (Routes) وب سرور | |
| # ========================================== | |
| def index(): | |
| return render_template('index.html') | |
| def serve_static(filename): | |
| return send_file(os.path.join('static', filename)) | |
| def serve_uploads(filename): | |
| return send_file(os.path.join(UPLOAD_FOLDER, filename)) | |
| def serve_audio(filename): | |
| return send_file(os.path.join(TEMP_AUDIO_FOLDER, filename)) | |
| def analyze_video(): | |
| video_file = request.files.get('video_file') | |
| target_lang = request.form.get('language', 'Persian') | |
| try: | |
| if video_file: | |
| logger.info("📥 دریافت فایل آپلودی...") | |
| filename = secure_filename(f"{uuid.uuid4()}_{video_file.filename}") | |
| video_path = os.path.join(UPLOAD_FOLDER, filename) | |
| video_file.save(video_path) | |
| else: | |
| return jsonify({"error": "No video provided"}), 400 | |
| logger.info("🎵 در حال استخراج صدا از ویدیو...") | |
| audio_path = extract_audio(video_path) | |
| prompt = f""" | |
| You are a Dubbing Director. | |
| {CAST_PROMPT} | |
| TASK: | |
| 1. Identify speakers in the audio. | |
| 2. Assign a Voice Actor ID from the list to each segment based on gender/tone. | |
| 3. Translate the dialogue to {target_lang}. | |
| 4. Return a JSON Array. | |
| Format: | |
| [ | |
| {{"start": 0.0, "end": 4.5, "speaker_id": "Charon", "text": "Translated text..."}}, | |
| ... | |
| ] | |
| """ | |
| script = generate_content_with_retry(prompt, audio_path) | |
| logger.info(f"🎙️ شروع تولید صدا برای {len(script)} قطعه...") | |
| results = asyncio.run(batch_generate_audio(script)) | |
| for res_item in results: | |
| idx = res_item['index'] | |
| if res_item['status'] == 'success': | |
| script[idx]['audio_file'] = res_item['file'] | |
| else: | |
| script[idx]['audio_file'] = None | |
| logger.info("✅ تحلیل و تولید اولیه تمام شد.") | |
| return jsonify({ | |
| "video_filename": os.path.basename(video_path), | |
| "script": script | |
| }) | |
| except Exception as e: | |
| logger.error(f"❌ خطای کلی سیستم: {str(e)}") | |
| return jsonify({"error": str(e)}), 500 | |
| def regenerate_segment(): | |
| data = request.get_json() | |
| text = data.get('text') | |
| speaker = data.get('speaker_id') | |
| ref_base64 = data.get('ref_audio_base64') | |
| is_custom = bool(str(speaker).startswith("custom_") and ref_base64) | |
| # اگر گوینده اختصاصی است، ابتدا با صدای پایه Charon درخواست میزنیم | |
| actual_speaker = "Charon" if is_custom else speaker | |
| try: | |
| logger.info(f"🎙️ در حال تولید صدای پایه برای: {actual_speaker}") | |
| resp = requests.post( | |
| PODCAST_API_URL, | |
| json={"text": text, "speaker": actual_speaker, "temperature": 0.9}, | |
| timeout=300 | |
| ) | |
| if resp.status_code == 200: | |
| audio_buffer = io.BytesIO(resp.content) | |
| # اگر کاربر صدای اختصاصی خواسته بود، خروجی به اسپیس VC فرستاده میشود | |
| if is_custom: | |
| logger.info("🔄 در حال ارتباط با اسپیس Voice Conversion (تغییر صدا)...") | |
| vc_buffer = process_voice_conversion(audio_buffer, ref_base64) | |
| if not vc_buffer: | |
| return jsonify({"error": "عملیات تغییر به صدای اختصاصی شکست خورد"}), 500 | |
| audio_buffer = vc_buffer | |
| # ذخیره فایل روی سرور | |
| filename = f"seg_{uuid.uuid4()}.wav" | |
| path = os.path.join(TEMP_AUDIO_FOLDER, filename) | |
| with open(path, 'wb') as f: | |
| f.write(audio_buffer.read()) | |
| return jsonify({"audio_file": filename}) | |
| else: | |
| return jsonify({"error": "شکست در تولید صدای پایه"}), 500 | |
| except Exception as e: | |
| logger.error(f"❌ خطا در جایگزینی قطعه: {str(e)}") | |
| return jsonify({"error": str(e)}), 500 | |
| def render_final(): | |
| data = request.get_json() | |
| video_filename = data.get('video_filename') | |
| script = data.get('script') | |
| video_path = os.path.join(UPLOAD_FOLDER, video_filename) | |
| if not os.path.exists(video_path): return jsonify({"error": "Video not found"}), 404 | |
| try: | |
| logger.info("🎬 شروع رندر نهایی ویدیو...") | |
| video_duration = get_video_duration(video_path) | |
| final_audio = AudioSegment.silent(duration=int(video_duration * 1000)) | |
| for seg in script: | |
| if not seg.get('audio_file'): continue | |
| seg_path = os.path.join(TEMP_AUDIO_FOLDER, seg['audio_file']) | |
| if not os.path.exists(seg_path): continue | |
| audio = AudioSegment.from_file(seg_path) | |
| target_dur_ms = (seg['end'] - seg['start']) * 1000 | |
| current_dur_ms = len(audio) | |
| if current_dur_ms > 0: | |
| speed = current_dur_ms / target_dur_ms | |
| speed = max(0.6, min(2.0, speed)) | |
| if abs(speed - 1.0) > 0.05: | |
| temp_out = seg_path.replace('.wav', '_speed.wav') | |
| atempo = f"atempo={speed}" | |
| if speed > 2.0: atempo = "atempo=2.0,atempo={}".format(speed/2) | |
| elif speed < 0.5: atempo = "atempo=0.5,atempo={}".format(speed/0.5) | |
| subprocess.run(['ffmpeg', '-y', '-i', seg_path, '-filter:a', atempo, temp_out], | |
| stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) | |
| if os.path.exists(temp_out): | |
| audio = AudioSegment.from_file(temp_out) | |
| start_ms = int(seg['start'] * 1000) | |
| final_audio = final_audio.overlay(audio, position=start_ms) | |
| final_mix_path = os.path.join(UPLOAD_FOLDER, f"mix_{uuid.uuid4()}.wav") | |
| final_audio.export(final_mix_path, format="wav") | |
| final_video_path = os.path.join(UPLOAD_FOLDER, f"dubbed_{uuid.uuid4()}.mp4") | |
| cmd = [ | |
| 'ffmpeg', '-y', | |
| '-i', video_path, | |
| '-i', final_mix_path, | |
| '-c:v', 'copy', | |
| '-c:a', 'aac', | |
| '-map', '0:v:0', | |
| '-map', '1:a:0', | |
| '-shortest', | |
| final_video_path | |
| ] | |
| subprocess.run(cmd, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) | |
| logger.info("✅ رندر نهایی با موفقیت انجام شد.") | |
| return jsonify({"download_url": f"/uploads/{os.path.basename(final_video_path)}"}) | |
| except Exception as e: | |
| logger.error(f"❌ خطا در رندر نهایی: {str(e)}") | |
| return jsonify({"error": str(e)}), 500 | |
| if __name__ == '__main__': | |
| port = int(os.environ.get('PORT', 7860)) | |
| app.run(host='0.0.0.0', port=port) |