Spaces:
Running
Running
| # app.py | |
| import gradio as gr | |
| import yt_dlp | |
| import tempfile | |
| import os | |
| import shutil | |
| import speech_recognition as sr | |
| from pydub import AudioSegment | |
| import time | |
| import warnings | |
| import uuid | |
| import logging | |
| from threading import Lock | |
| from fastapi import FastAPI | |
| # تنظیمات لاگگیری سطح دیباگ | |
| logging.basicConfig(level=logging.DEBUG) | |
| logger = logging.getLogger(__name__) | |
| warnings.filterwarnings("ignore") | |
| # ذخیره سشنها و متون استخراجشده | |
| session_map = {} | |
| transcriptions_db = {} | |
| lock = Lock() | |
| def convert_to_mp3_and_transcribe(video_url, language, progress=gr.Progress(), request: gr.Request = None): | |
| """ | |
| دانلود ویدیو، تبدیل به MP3 و استخراج متن به زبان انتخابی. | |
| """ | |
| session_hash = request.session_hash if request else str(uuid.uuid4()) | |
| logger.debug(f"[{session_hash}] شروع درخواست با زبان {language}") | |
| if not video_url: | |
| return None, None, "لینک ویدیو را وارد کنید." | |
| try: | |
| progress(0, desc="در حال دانلود ویدیو...") | |
| logger.debug(f"[{session_hash}] در حال دانلود ویدیو از: {video_url}") | |
| ydl_opts = { | |
| 'format': 'bestaudio[ext=m4a]/bestaudio/best', | |
| 'postprocessors': [{ | |
| 'key': 'FFmpegExtractAudio', | |
| 'preferredcodec': 'mp3', | |
| 'preferredquality': '192', | |
| }], | |
| 'outtmpl': 'temp.%(ext)s', | |
| 'quiet': True, | |
| } | |
| with yt_dlp.YoutubeDL(ydl_opts) as ydl: | |
| ydl.download([video_url]) | |
| # پیدا کردن mp3 | |
| mp3_file = None | |
| for file in os.listdir('.'): | |
| if file.startswith('temp.') and file.endswith('.mp3'): | |
| mp3_file = file | |
| break | |
| if not mp3_file: | |
| logger.warning(f"[{session_hash}] MP3 دانلود نشد!") | |
| return None, None, "خطا در دانلود یا تبدیل." | |
| temp_mp3 = tempfile.NamedTemporaryFile(suffix='.mp3', delete=False) | |
| temp_mp3.close() | |
| shutil.copy2(mp3_file, temp_mp3.name) | |
| os.remove(mp3_file) | |
| logger.debug(f"[{session_hash}] MP3 آماده: {temp_mp3.name} | سایز: {os.path.getsize(temp_mp3.name)//1024}KB") | |
| progress(0.3, desc="ویرایش صوت...") | |
| text, error_msg = transcribe_audio( | |
| temp_mp3.name, progress, language, session_hash | |
| ) | |
| if text is None: | |
| logger.warning(f"[{session_hash}] استخراج متن ناموفق بود: {error_msg}") | |
| return temp_mp3.name, None, f"MP3 آماده. متن استخراج نشد. {error_msg or ''}" | |
| logger.info(f"[{session_hash}] استخراج متن موفقیتآمیز: {text[:100]}...") | |
| progress(1.0, desc="استخراج متن کامل شد!") | |
| with lock: | |
| transcriptions_db[session_hash] = text | |
| return temp_mp3.name, text, f"Success - Session Hash: {session_hash}" | |
| except Exception as e: | |
| logger.error(f"[{session_hash}] خطای کلی: {str(e)}", exc_info=True) | |
| return None, None, f"خطای سیستمی — لاگ لاگیرنده را ببینید." | |
| def transcribe_audio(mp3_path, progress, language, session_hash, chunk_length_ms=55000, overlap_ms=5000): | |
| recognizer = sr.Recognizer() | |
| recognizer.energy_threshold = 300 | |
| recognizer.dynamic_energy_threshold = True | |
| recognizer.pause_threshold = 0.6 | |
| full_text = [] | |
| bad_chunks = 0 | |
| total_chunks = 0 | |
| temp_wav_dir = tempfile.mkdtemp() | |
| audio = AudioSegment.from_mp3(mp3_path) | |
| duration_ms = len(audio) | |
| if duration_ms == 0: | |
| return None, "صدایی پیدا نشد." | |
| step_size = chunk_length_ms - overlap_ms | |
| if step_size <= 0: | |
| step_size = chunk_length_ms // 2 | |
| num_chunks = max(1, (duration_ms // step_size) + 1) | |
| logger.debug(f"[{session_hash}] مدت فایل: {duration_ms/1000:.2f}s، تعداد chunkها: {num_chunks}") | |
| progress(0.5, desc="در حال تقسیم صوت...") | |
| i = 0 | |
| chunk_idx = 1 | |
| while i < duration_ms: | |
| end_pos = min(i + chunk_length_ms, duration_ms) | |
| chunk = audio[i:end_pos] | |
| if len(chunk) < 2000: | |
| logger.debug(f"[{session_hash}] chunk {chunk_idx} خیلی کوتاه، متوقف میشود.") | |
| break | |
| temp_wav = os.path.join(temp_wav_dir, f"chunk_{i}.wav") | |
| chunk.export(temp_wav, format="wav") | |
| try: | |
| progress(0.5 + (i / duration_ms) * 0.5, desc=f"در حال استخراج chunk {chunk_idx}/{num_chunks}...") | |
| with sr.AudioFile(temp_wav) as source: | |
| recognizer.adjust_for_ambient_noise(source, duration=0.5) | |
| audio_data = recognizer.record(source) | |
| text = recognizer.recognize_google(audio_data, language=language) | |
| if not text.strip(): | |
| text = "[سکوت]" | |
| logger.debug(f"[{session_hash}] chunk {chunk_idx}: {text[:50]}...") | |
| full_text.append(text) | |
| except sr.UnknownValueError: | |
| logger.warning(f"[{session_hash}] chunk {chunk_idx} نامشخص.") | |
| full_text.append("[نامشخص]") | |
| except sr.RequestError as e: | |
| logger.warning(f"[{session_hash}] chunk {chunk_idx} RequestError: {str(e)}") | |
| full_text.append("[خطای شبکه]") | |
| except Exception as e: | |
| logger.error(f"[{session_hash}] chunk {chunk_idx} خطای داخلی: {str(e)}", exc_info=True) | |
| full_text.append("[خطای داخلی]") | |
| if os.path.exists(temp_wav): | |
| os.remove(temp_wav) | |
| i += step_size | |
| chunk_idx += 1 | |
| shutil.rmtree(temp_wav_dir, ignore_errors=True) | |
| final = " ".join(full_text).strip() | |
| if not final: | |
| return None, "هیچ متنی استخراج نشد." | |
| return final, None | |
| def query_transcription(session_hash): | |
| if not session_hash: | |
| return "لطفا session hash معتبر وارد کنید." | |
| with lock: | |
| text = transcriptions_db.get(session_hash) | |
| if not text: | |
| return "none" | |
| logger.debug(f"[{session_hash}] درخواست استعلام متن.") | |
| return text | |
| # رابط گرافی Gradio | |
| with gr.Blocks() as app: | |
| with gr.Tab("تبدیل ویدیو به صوت و متن"): | |
| gr.Interface( | |
| fn=convert_to_mp3_and_transcribe, | |
| inputs=[ | |
| gr.Textbox(label="لینک ویدیو", placeholder="YouTube یا MP4..."), | |
| gr.Dropdown( | |
| choices=[("پارسی", "fa-IR"), ("انگلیسی", "en-US")], | |
| value="fa-IR", | |
| label="زبان متن" | |
| ) | |
| ], | |
| outputs=[ | |
| gr.File(label="دانلود MP3"), | |
| gr.Textbox(label="متن استخراجشده", lines=10), | |
| gr.Textbox(label="وضعیت") | |
| ], | |
| title="تبدیل ویدیو به MP3 و استخراج متن", | |
| examples=[ | |
| ["https://www.youtube.com/watch?v=5qap5aO4i9A", "fa-IR"], | |
| ["https://www.youtube.com/watch?v=dQw4w9WgXcQ", "en-US"] | |
| ] | |
| ) | |
| with gr.Tab("جستجوی متن بر اساس Session Hash"): | |
| with gr.Row(): | |
| textbox = gr.Textbox(label="Session Hash") | |
| btn = gr.Button("جستجوی متن") | |
| output = gr.Textbox(label="نتیجه", lines=10) | |
| btn.click(fn=query_transcription, inputs=textbox, outputs=output) | |
| # افزودن endpoint API برای دسترسی سریع | |
| async def get_text(session_hash: str): | |
| with lock: | |
| text = transcriptions_db.get(session_hash) | |
| return {"session_hash": session_hash, "text": text or "Not Found"} | |
| # اجرای اپ | |
| if __name__ == "__main__": | |
| app.launch( | |
| server_name="0.0.0.0", | |
| server_port=7860, | |
| share=False, | |
| debug=True # این کار را Gradio انجام میدهد. | |
| ) |