# app.py import gradio as gr import yt_dlp import tempfile import os import shutil import speech_recognition as sr from pydub import AudioSegment import time import warnings import uuid import logging from threading import Lock from fastapi import FastAPI # تنظیمات لاگ‌گیری سطح دیباگ logging.basicConfig(level=logging.DEBUG) logger = logging.getLogger(__name__) warnings.filterwarnings("ignore") # ذخیره سشن‌ها و متون استخراج‌شده session_map = {} transcriptions_db = {} lock = Lock() def convert_to_mp3_and_transcribe(video_url, language, progress=gr.Progress(), request: gr.Request = None): """ دانلود ویدیو، تبدیل به MP3 و استخراج متن به زبان انتخابی. """ session_hash = request.session_hash if request else str(uuid.uuid4()) logger.debug(f"[{session_hash}] شروع درخواست با زبان {language}") if not video_url: return None, None, "لینک ویدیو را وارد کنید." try: progress(0, desc="در حال دانلود ویدیو...") logger.debug(f"[{session_hash}] در حال دانلود ویدیو از: {video_url}") ydl_opts = { 'format': 'bestaudio[ext=m4a]/bestaudio/best', 'postprocessors': [{ 'key': 'FFmpegExtractAudio', 'preferredcodec': 'mp3', 'preferredquality': '192', }], 'outtmpl': 'temp.%(ext)s', 'quiet': True, } with yt_dlp.YoutubeDL(ydl_opts) as ydl: ydl.download([video_url]) # پیدا کردن mp3 mp3_file = None for file in os.listdir('.'): if file.startswith('temp.') and file.endswith('.mp3'): mp3_file = file break if not mp3_file: logger.warning(f"[{session_hash}] MP3 دانلود نشد!") return None, None, "خطا در دانلود یا تبدیل." temp_mp3 = tempfile.NamedTemporaryFile(suffix='.mp3', delete=False) temp_mp3.close() shutil.copy2(mp3_file, temp_mp3.name) os.remove(mp3_file) logger.debug(f"[{session_hash}] MP3 آماده: {temp_mp3.name} | سایز: {os.path.getsize(temp_mp3.name)//1024}KB") progress(0.3, desc="ویرایش صوت...") text, error_msg = transcribe_audio( temp_mp3.name, progress, language, session_hash ) if text is None: logger.warning(f"[{session_hash}] استخراج متن ناموفق بود: {error_msg}") return temp_mp3.name, None, f"MP3 آماده. متن استخراج نشد. {error_msg or ''}" logger.info(f"[{session_hash}] استخراج متن موفقیت‌آمیز: {text[:100]}...") progress(1.0, desc="استخراج متن کامل شد!") with lock: transcriptions_db[session_hash] = text return temp_mp3.name, text, f"Success - Session Hash: {session_hash}" except Exception as e: logger.error(f"[{session_hash}] خطای کلی: {str(e)}", exc_info=True) return None, None, f"خطای سیستمی — لاگ لاگیرنده را ببینید." def transcribe_audio(mp3_path, progress, language, session_hash, chunk_length_ms=55000, overlap_ms=5000): recognizer = sr.Recognizer() recognizer.energy_threshold = 300 recognizer.dynamic_energy_threshold = True recognizer.pause_threshold = 0.6 full_text = [] bad_chunks = 0 total_chunks = 0 temp_wav_dir = tempfile.mkdtemp() audio = AudioSegment.from_mp3(mp3_path) duration_ms = len(audio) if duration_ms == 0: return None, "صدایی پیدا نشد." step_size = chunk_length_ms - overlap_ms if step_size <= 0: step_size = chunk_length_ms // 2 num_chunks = max(1, (duration_ms // step_size) + 1) logger.debug(f"[{session_hash}] مدت فایل: {duration_ms/1000:.2f}s، تعداد chunkها: {num_chunks}") progress(0.5, desc="در حال تقسیم صوت...") i = 0 chunk_idx = 1 while i < duration_ms: end_pos = min(i + chunk_length_ms, duration_ms) chunk = audio[i:end_pos] if len(chunk) < 2000: logger.debug(f"[{session_hash}] chunk {chunk_idx} خیلی کوتاه، متوقف می‌شود.") break temp_wav = os.path.join(temp_wav_dir, f"chunk_{i}.wav") chunk.export(temp_wav, format="wav") try: progress(0.5 + (i / duration_ms) * 0.5, desc=f"در حال استخراج chunk {chunk_idx}/{num_chunks}...") with sr.AudioFile(temp_wav) as source: recognizer.adjust_for_ambient_noise(source, duration=0.5) audio_data = recognizer.record(source) text = recognizer.recognize_google(audio_data, language=language) if not text.strip(): text = "[سکوت]" logger.debug(f"[{session_hash}] chunk {chunk_idx}: {text[:50]}...") full_text.append(text) except sr.UnknownValueError: logger.warning(f"[{session_hash}] chunk {chunk_idx} نامشخص.") full_text.append("[نامشخص]") except sr.RequestError as e: logger.warning(f"[{session_hash}] chunk {chunk_idx} RequestError: {str(e)}") full_text.append("[خطای شبکه]") except Exception as e: logger.error(f"[{session_hash}] chunk {chunk_idx} خطای داخلی: {str(e)}", exc_info=True) full_text.append("[خطای داخلی]") if os.path.exists(temp_wav): os.remove(temp_wav) i += step_size chunk_idx += 1 shutil.rmtree(temp_wav_dir, ignore_errors=True) final = " ".join(full_text).strip() if not final: return None, "هیچ متنی استخراج نشد." return final, None def query_transcription(session_hash): if not session_hash: return "لطفا session hash معتبر وارد کنید." with lock: text = transcriptions_db.get(session_hash) if not text: return "none" logger.debug(f"[{session_hash}] درخواست استعلام متن.") return text # رابط گرافی Gradio with gr.Blocks() as app: with gr.Tab("تبدیل ویدیو به صوت و متن"): gr.Interface( fn=convert_to_mp3_and_transcribe, inputs=[ gr.Textbox(label="لینک ویدیو", placeholder="YouTube یا MP4..."), gr.Dropdown( choices=[("پارسی", "fa-IR"), ("انگلیسی", "en-US")], value="fa-IR", label="زبان متن" ) ], outputs=[ gr.File(label="دانلود MP3"), gr.Textbox(label="متن استخراج‌شده", lines=10), gr.Textbox(label="وضعیت") ], title="تبدیل ویدیو به MP3 و استخراج متن", examples=[ ["https://www.youtube.com/watch?v=5qap5aO4i9A", "fa-IR"], ["https://www.youtube.com/watch?v=dQw4w9WgXcQ", "en-US"] ] ) with gr.Tab("جستجوی متن بر اساس Session Hash"): with gr.Row(): textbox = gr.Textbox(label="Session Hash") btn = gr.Button("جستجوی متن") output = gr.Textbox(label="نتیجه", lines=10) btn.click(fn=query_transcription, inputs=textbox, outputs=output) # افزودن endpoint API برای دسترسی سریع @app.app.get("/api/text/{session_hash}") async def get_text(session_hash: str): with lock: text = transcriptions_db.get(session_hash) return {"session_hash": session_hash, "text": text or "Not Found"} # اجرای اپ if __name__ == "__main__": app.launch( server_name="0.0.0.0", server_port=7860, share=False, debug=True # این کار را Gradio انجام می‌دهد. )