hfvd / app.py
suprimedev's picture
Update app.py
540971d verified
# app.py
import gradio as gr
import yt_dlp
import tempfile
import os
import shutil
import speech_recognition as sr
from pydub import AudioSegment
import time
import warnings
import uuid
import logging
from threading import Lock
from fastapi import FastAPI
# تنظیمات لاگ‌گیری سطح دیباگ
logging.basicConfig(level=logging.DEBUG)
logger = logging.getLogger(__name__)
warnings.filterwarnings("ignore")
# ذخیره سشن‌ها و متون استخراج‌شده
session_map = {}
transcriptions_db = {}
lock = Lock()
def convert_to_mp3_and_transcribe(video_url, language, progress=gr.Progress(), request: gr.Request = None):
"""
دانلود ویدیو، تبدیل به MP3 و استخراج متن به زبان انتخابی.
"""
session_hash = request.session_hash if request else str(uuid.uuid4())
logger.debug(f"[{session_hash}] شروع درخواست با زبان {language}")
if not video_url:
return None, None, "لینک ویدیو را وارد کنید."
try:
progress(0, desc="در حال دانلود ویدیو...")
logger.debug(f"[{session_hash}] در حال دانلود ویدیو از: {video_url}")
ydl_opts = {
'format': 'bestaudio[ext=m4a]/bestaudio/best',
'postprocessors': [{
'key': 'FFmpegExtractAudio',
'preferredcodec': 'mp3',
'preferredquality': '192',
}],
'outtmpl': 'temp.%(ext)s',
'quiet': True,
}
with yt_dlp.YoutubeDL(ydl_opts) as ydl:
ydl.download([video_url])
# پیدا کردن mp3
mp3_file = None
for file in os.listdir('.'):
if file.startswith('temp.') and file.endswith('.mp3'):
mp3_file = file
break
if not mp3_file:
logger.warning(f"[{session_hash}] MP3 دانلود نشد!")
return None, None, "خطا در دانلود یا تبدیل."
temp_mp3 = tempfile.NamedTemporaryFile(suffix='.mp3', delete=False)
temp_mp3.close()
shutil.copy2(mp3_file, temp_mp3.name)
os.remove(mp3_file)
logger.debug(f"[{session_hash}] MP3 آماده: {temp_mp3.name} | سایز: {os.path.getsize(temp_mp3.name)//1024}KB")
progress(0.3, desc="ویرایش صوت...")
text, error_msg = transcribe_audio(
temp_mp3.name, progress, language, session_hash
)
if text is None:
logger.warning(f"[{session_hash}] استخراج متن ناموفق بود: {error_msg}")
return temp_mp3.name, None, f"MP3 آماده. متن استخراج نشد. {error_msg or ''}"
logger.info(f"[{session_hash}] استخراج متن موفقیت‌آمیز: {text[:100]}...")
progress(1.0, desc="استخراج متن کامل شد!")
with lock:
transcriptions_db[session_hash] = text
return temp_mp3.name, text, f"Success - Session Hash: {session_hash}"
except Exception as e:
logger.error(f"[{session_hash}] خطای کلی: {str(e)}", exc_info=True)
return None, None, f"خطای سیستمی — لاگ لاگیرنده را ببینید."
def transcribe_audio(mp3_path, progress, language, session_hash, chunk_length_ms=55000, overlap_ms=5000):
recognizer = sr.Recognizer()
recognizer.energy_threshold = 300
recognizer.dynamic_energy_threshold = True
recognizer.pause_threshold = 0.6
full_text = []
bad_chunks = 0
total_chunks = 0
temp_wav_dir = tempfile.mkdtemp()
audio = AudioSegment.from_mp3(mp3_path)
duration_ms = len(audio)
if duration_ms == 0:
return None, "صدایی پیدا نشد."
step_size = chunk_length_ms - overlap_ms
if step_size <= 0:
step_size = chunk_length_ms // 2
num_chunks = max(1, (duration_ms // step_size) + 1)
logger.debug(f"[{session_hash}] مدت فایل: {duration_ms/1000:.2f}s، تعداد chunkها: {num_chunks}")
progress(0.5, desc="در حال تقسیم صوت...")
i = 0
chunk_idx = 1
while i < duration_ms:
end_pos = min(i + chunk_length_ms, duration_ms)
chunk = audio[i:end_pos]
if len(chunk) < 2000:
logger.debug(f"[{session_hash}] chunk {chunk_idx} خیلی کوتاه، متوقف می‌شود.")
break
temp_wav = os.path.join(temp_wav_dir, f"chunk_{i}.wav")
chunk.export(temp_wav, format="wav")
try:
progress(0.5 + (i / duration_ms) * 0.5, desc=f"در حال استخراج chunk {chunk_idx}/{num_chunks}...")
with sr.AudioFile(temp_wav) as source:
recognizer.adjust_for_ambient_noise(source, duration=0.5)
audio_data = recognizer.record(source)
text = recognizer.recognize_google(audio_data, language=language)
if not text.strip():
text = "[سکوت]"
logger.debug(f"[{session_hash}] chunk {chunk_idx}: {text[:50]}...")
full_text.append(text)
except sr.UnknownValueError:
logger.warning(f"[{session_hash}] chunk {chunk_idx} نامشخص.")
full_text.append("[نامشخص]")
except sr.RequestError as e:
logger.warning(f"[{session_hash}] chunk {chunk_idx} RequestError: {str(e)}")
full_text.append("[خطای شبکه]")
except Exception as e:
logger.error(f"[{session_hash}] chunk {chunk_idx} خطای داخلی: {str(e)}", exc_info=True)
full_text.append("[خطای داخلی]")
if os.path.exists(temp_wav):
os.remove(temp_wav)
i += step_size
chunk_idx += 1
shutil.rmtree(temp_wav_dir, ignore_errors=True)
final = " ".join(full_text).strip()
if not final:
return None, "هیچ متنی استخراج نشد."
return final, None
def query_transcription(session_hash):
if not session_hash:
return "لطفا session hash معتبر وارد کنید."
with lock:
text = transcriptions_db.get(session_hash)
if not text:
return "none"
logger.debug(f"[{session_hash}] درخواست استعلام متن.")
return text
# رابط گرافی Gradio
with gr.Blocks() as app:
with gr.Tab("تبدیل ویدیو به صوت و متن"):
gr.Interface(
fn=convert_to_mp3_and_transcribe,
inputs=[
gr.Textbox(label="لینک ویدیو", placeholder="YouTube یا MP4..."),
gr.Dropdown(
choices=[("پارسی", "fa-IR"), ("انگلیسی", "en-US")],
value="fa-IR",
label="زبان متن"
)
],
outputs=[
gr.File(label="دانلود MP3"),
gr.Textbox(label="متن استخراج‌شده", lines=10),
gr.Textbox(label="وضعیت")
],
title="تبدیل ویدیو به MP3 و استخراج متن",
examples=[
["https://www.youtube.com/watch?v=5qap5aO4i9A", "fa-IR"],
["https://www.youtube.com/watch?v=dQw4w9WgXcQ", "en-US"]
]
)
with gr.Tab("جستجوی متن بر اساس Session Hash"):
with gr.Row():
textbox = gr.Textbox(label="Session Hash")
btn = gr.Button("جستجوی متن")
output = gr.Textbox(label="نتیجه", lines=10)
btn.click(fn=query_transcription, inputs=textbox, outputs=output)
# افزودن endpoint API برای دسترسی سریع
@app.app.get("/api/text/{session_hash}")
async def get_text(session_hash: str):
with lock:
text = transcriptions_db.get(session_hash)
return {"session_hash": session_hash, "text": text or "Not Found"}
# اجرای اپ
if __name__ == "__main__":
app.launch(
server_name="0.0.0.0",
server_port=7860,
share=False,
debug=True # این کار را Gradio انجام می‌دهد.
)