ArtSpace commited on
Commit
3412e09
·
verified ·
1 Parent(s): 133af89

Update app.py

Browse files
Files changed (1) hide show
  1. app.py +107 -209
app.py CHANGED
@@ -1,19 +1,20 @@
1
  """
2
  MediaTranscriberPro - Hugging Face Space
3
- Standalone audio/video transcription with Gradio UI + API
4
  """
5
- # ============================================================================
6
- # 🔧 MAGIC PATCH: FORCE IPv4
7
- # هذا الكود يجبر التطبيق على استخدام IPv4 فقط لحل مشكلة DNS مع يوتيوب
8
  import socket
9
- def force_ipv4_getaddrinfo(host, port, family=0, type=0, proto=0, flags=0):
10
- # Force usage of AF_INET (IPv4)
11
- return socket.getaddrinfo_original(host, port, socket.AF_INET, type, proto, flags)
12
 
13
- if not hasattr(socket, 'getaddrinfo_original'):
14
- socket.getaddrinfo_original = socket.getaddrinfo
15
- socket.getaddrinfo = force_ipv4_getaddrinfo
16
- # ============================================================================
 
 
 
17
 
18
  import gradio as gr
19
  import logging
@@ -21,246 +22,143 @@ import tempfile
21
  import shutil
22
  import subprocess
23
  import re
24
- import yt_dlp # سنستخدم المكتبة مباشرة الآن
25
  from pathlib import Path
26
  from dataclasses import dataclass
27
  from typing import Optional, Callable
28
- from enum import Enum
29
-
30
- # ======================== Configuration ========================
31
- SUPPORTED_AUDIO = {".mp3", ".wav", ".m4a", ".aac", ".ogg", ".opus", ".flac"}
32
- SUPPORTED_VIDEO = {".mp4", ".mkv", ".avi", ".mov", ".webm"}
33
- SUPPORTED_MEDIA = SUPPORTED_AUDIO | SUPPORTED_VIDEO
34
-
35
- URL_PATTERNS = {
36
- "youtube": r"(?:https?://)?(?:www\.|m\.)?(?:youtube\.com/(?:watch\?v=|shorts/)|youtu\.be/)[\w-]+",
37
- "facebook": r"(?:https?://)?(?:www\.|m\.|web\.)?facebook\.com/(?:watch/?\?v=|videos/|posts/|reel/)",
38
- "googledrive": r"(?:https?://)?drive\.google\.com/(?:file/d/|open\?id=|uc\?id=)[\w-]+",
39
- }
40
 
 
41
  logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
42
  logger = logging.getLogger(__name__)
43
 
44
- # ======================== Data Models ========================
45
- @dataclass
46
- class DownloadResult:
47
- success: bool
48
- filepath: Optional[Path] = None
49
- error: Optional[str] = None
50
 
51
  @dataclass
52
- class ConversionResult:
53
  success: bool
54
- wav_path: Optional[Path] = None
 
55
  error: Optional[str] = None
56
 
57
- @dataclass
58
- class TranscriptionResult:
59
- success: bool
60
- text: str = ""
61
- segments: list = None
62
- error: Optional[str] = None
63
-
64
- # ======================== Core: MediaDownloader ========================
65
  class MediaDownloader:
66
- """Download using yt_dlp Python Library (More Robust)"""
67
-
68
- def __init__(self, output_dir: Optional[Path] = None):
69
- self.output_dir = Path(output_dir) if output_dir else Path(tempfile.mkdtemp())
70
  self.output_dir.mkdir(parents=True, exist_ok=True)
71
-
72
- def detect_url_type(self, url: str) -> Optional[str]:
73
- for platform, pattern in URL_PATTERNS.items():
74
- if re.search(pattern, url, re.IGNORECASE):
75
- return platform
76
- return None
77
-
78
- def download_url(self, url: str, progress_callback: Optional[Callable] = None) -> DownloadResult:
79
  try:
80
- platform = self.detect_url_type(url)
81
- if not platform:
82
- return DownloadResult(success=False, error="Unsupported URL")
83
 
84
- if progress_callback:
85
- progress_callback(0.1, "Initializing download...")
86
-
87
- # إعدادات التحميل المباشرة
88
  ydl_opts = {
89
  'format': 'bestaudio/best',
90
  'outtmpl': str(self.output_dir / '%(title)s.%(ext)s'),
91
  'noplaylist': True,
92
- 'source_address': '0.0.0.0', # زيادة في التأكيد لإجبار IPv4
 
 
93
  'quiet': True,
94
  'no_warnings': True,
 
 
95
  }
96
 
97
- logger.info(f"Downloading from {platform}: {url}")
98
-
99
  with yt_dlp.YoutubeDL(ydl_opts) as ydl:
100
  info = ydl.extract_info(url, download=True)
101
  filename = ydl.prepare_filename(info)
102
- downloaded_file = Path(filename)
103
-
104
- if not downloaded_file.exists():
105
- # محاولة البحث اليدوي في حالة اختلاف الاسم
106
- files = list(self.output_dir.glob("*"))
107
- media_files = [f for f in files if f.suffix.lower() in SUPPORTED_MEDIA]
108
- if not media_files:
109
- return DownloadResult(success=False, error="File downloaded but not found!")
110
- downloaded_file = max(media_files, key=lambda x: x.stat().st_mtime)
111
 
112
- if progress_callback:
113
- progress_callback(1.0, "Download complete")
114
-
115
- return DownloadResult(success=True, filepath=downloaded_file)
116
-
117
- except Exception as e:
118
- logger.error(f"Download failed: {e}")
119
- return DownloadResult(success=False, error=str(e))
120
 
121
- # ======================== Core: AudioConverter ========================
122
- class AudioConverter:
123
- """Convert media to WAV using FFmpeg"""
124
-
125
- def convert(self, input_path: Path, progress_callback: Optional[Callable] = None) -> ConversionResult:
126
- try:
127
- if progress_callback:
128
- progress_callback(0.1, "Converting to WAV...")
129
-
130
- output_path = input_path.with_suffix('.wav')
131
-
132
- cmd = [
133
- "ffmpeg", "-i", str(input_path),
134
- "-ar", "16000",
135
- "-ac", "1",
136
- "-y",
137
- str(output_path)
138
- ]
139
-
140
- logger.info(f"Converting {input_path.name} to WAV")
141
- subprocess.run(cmd, capture_output=True, check=True)
142
-
143
- return ConversionResult(success=True, wav_path=output_path)
144
-
145
  except Exception as e:
146
- logger.error(f"Conversion failed: {e}")
147
- return ConversionResult(success=False, error=str(e))
148
 
149
- # ======================== Core: SpeechTranscriber ========================
150
- class SpeechTranscriber:
151
- """Transcribe audio using faster-whisper"""
152
-
153
- def __init__(self, model_size: str = "medium"):
154
- try:
 
 
 
 
155
  from faster_whisper import WhisperModel
156
- logger.info(f"Loading Whisper model: {model_size}")
157
- self.model = WhisperModel(model_size, device="cpu", compute_type="int8")
158
- except Exception as e:
159
- logger.error(f"Error loading model: {e}")
160
- self.model = None
161
-
162
- def transcribe(self, wav_path: Path, language: str = "ar", progress_callback: Optional[Callable] = None) -> TranscriptionResult:
163
- try:
164
- if not self.model:
165
- return TranscriptionResult(success=False, error="Model not loaded")
166
-
167
- if progress_callback:
168
- progress_callback(0.1, "Transcribing...")
169
-
170
- lang_code = language.split('-')[0] if '-' in language else language
171
-
172
- segments_iter, info = self.model.transcribe(
173
- str(wav_path),
174
- language=lang_code,
175
- beam_size=5,
176
- vad_filter=True
177
- )
178
- segments = list(segments_iter)
179
- text = " ".join([s.text for s in segments])
180
-
181
- return TranscriptionResult(success=True, text=text.strip(), segments=segments)
182
-
183
- except Exception as e:
184
- logger.error(f"Transcription failed: {e}")
185
- return TranscriptionResult(success=False, error=str(e))
186
 
187
- # ======================== Utilities ========================
188
- def generate_srt(segments, output_path: Path):
189
- with open(output_path, 'w', encoding='utf-8') as f:
190
- for i, seg in enumerate(segments, 1):
191
- start = format_timestamp(seg.start)
192
- end = format_timestamp(seg.end)
193
- f.write(f"{i}\n{start} --> {end}\n{seg.text.strip()}\n\n")
194
-
195
- def format_timestamp(seconds: float) -> str:
196
- hours = int(seconds // 3600)
197
- minutes = int((seconds % 3600) // 60)
198
- secs = int(seconds % 60)
199
- millis = int((seconds % 1) * 1000)
200
- return f"{hours:02d}:{minutes:02d}:{secs:02d},{millis:03d}"
201
-
202
- # ======================== Main Pipeline ========================
203
- class TranscriptionPipeline:
204
- def __init__(self):
205
- self.temp_dir = Path(tempfile.mkdtemp())
206
- self.downloader = MediaDownloader(output_dir=self.temp_dir / "downloads")
207
- self.converter = AudioConverter()
208
- self.transcriber = SpeechTranscriber(model_size="medium")
209
-
210
- def process(self, source_url, upload_file, language, gen_txt, gen_srt, progress=gr.Progress()):
211
  try:
212
- # 1. Source
213
- if upload_file:
214
- source_path = Path(upload_file)
215
- elif source_url:
216
- res = self.downloader.download_url(source_url, lambda p, m: progress(p*0.3, desc=m))
217
- if not res.success: return f"Download Error: {res.error}", None, None, None
218
- source_path = res.filepath
 
219
  else:
220
- return "No input provided", None, None, None
 
 
 
 
221
 
222
- # 2. Convert
223
- res_conv = self.converter.convert(source_path, lambda p, m: progress(0.4, desc=m))
224
- if not res_conv.success: return f"Convert Error: {res_conv.error}", None, None, None
225
 
226
- # 3. Transcribe
227
- res_trans = self.transcriber.transcribe(res_conv.wav_path, language, lambda p, m: progress(0.7, desc=m))
228
- if not res_trans.success: return f"Transcribe Error: {res_trans.error}", None, None, None
 
 
 
 
 
 
229
 
230
- # 4. Output
231
- out_dir = self.temp_dir / "outputs"
232
- out_dir.mkdir(exist_ok=True)
233
 
234
- txt_file = str(out_dir / "transcription.txt")
235
- with open(txt_file, 'w', encoding='utf-8') as f: f.write(res_trans.text)
 
 
 
236
 
237
- srt_file = None
238
- if gen_srt:
239
- srt_file = str(out_dir / "subtitles.srt")
240
- generate_srt(res_trans.segments, Path(srt_file))
241
-
242
- return f"Done! Length: {len(res_trans.text)} chars", res_trans.text, txt_file, srt_file
243
 
244
  except Exception as e:
245
- return f"Error: {e}", None, None, None
246
 
247
- # ======================== Interface ========================
248
- def create_interface():
249
- pipeline = TranscriptionPipeline()
250
- with gr.Blocks() as demo:
251
- gr.Markdown("# MediaTranscriber Pro")
252
- with gr.Row():
253
- url = gr.Textbox(label="URL")
254
- upl = gr.File(label="Upload File")
255
- lang = gr.Dropdown(["ar", "en"], value="ar", label="Language")
256
- btn = gr.Button("Start")
257
- out_txt = gr.Textbox(label="Transcript", interactive=False) # Fixed interactive
258
- with gr.Row():
259
- dl_txt = gr.File(label="TXT")
260
- dl_srt = gr.File(label="SRT")
261
-
262
- btn.click(pipeline.process, [url, upl, lang, gr.Checkbox(value=True), gr.Checkbox(value=True)], [gr.Markdown(), out_txt, dl_txt, dl_srt])
263
- return demo
 
 
264
 
265
  if __name__ == "__main__":
266
- create_interface().launch(server_name="0.0.0.0", server_port=7860)
 
1
  """
2
  MediaTranscriberPro - Hugging Face Space
3
+ Final Fix for DNS/IPv6 Issues
4
  """
5
+ # ---------------------------------------------------------
6
+ # LAYER 1: SYSTEM SOCKET PATCH (Must be at the very top)
7
+ # ---------------------------------------------------------
8
  import socket
9
+ import os
 
 
10
 
11
+ # Force IPv4 for all socket connections
12
+ old_getaddrinfo = socket.getaddrinfo
13
+ def new_getaddrinfo(*args, **kwargs):
14
+ responses = old_getaddrinfo(*args, **kwargs)
15
+ return [response for response in responses if response[0] == socket.AF_INET]
16
+ socket.getaddrinfo = new_getaddrinfo
17
+ # ---------------------------------------------------------
18
 
19
  import gradio as gr
20
  import logging
 
22
  import shutil
23
  import subprocess
24
  import re
25
+ import yt_dlp
26
  from pathlib import Path
27
  from dataclasses import dataclass
28
  from typing import Optional, Callable
 
 
 
 
 
 
 
 
 
 
 
 
29
 
30
+ # Logging Setup
31
  logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
32
  logger = logging.getLogger(__name__)
33
 
34
+ # Constants
35
+ SUPPORTED_MEDIA = {".mp3", ".wav", ".m4a", ".aac", ".ogg", ".opus", ".flac", ".mp4", ".mkv", ".avi", ".mov", ".webm"}
 
 
 
 
36
 
37
  @dataclass
38
+ class Result:
39
  success: bool
40
+ data: Optional[str] = None
41
+ file_path: Optional[str] = None
42
  error: Optional[str] = None
43
 
 
 
 
 
 
 
 
 
44
  class MediaDownloader:
45
+ def __init__(self, output_dir):
46
+ self.output_dir = output_dir
 
 
47
  self.output_dir.mkdir(parents=True, exist_ok=True)
48
+
49
+ def download(self, url, progress=None):
 
 
 
 
 
 
50
  try:
51
+ if progress: progress(0.1, "Initializing download...")
 
 
52
 
53
+ # LAYER 2: YT-DLP SPECIFIC OPTIONS
 
 
 
54
  ydl_opts = {
55
  'format': 'bestaudio/best',
56
  'outtmpl': str(self.output_dir / '%(title)s.%(ext)s'),
57
  'noplaylist': True,
58
+ 'force_ipv4': True, # <--- يجبر المكتبة على استخدام IPv4
59
+ 'nocheckcertificate': True, # <--- يتجاوز أخطاء SSL
60
+ 'socket_timeout': 30, # <--- يزيد وقت الانتظار
61
  'quiet': True,
62
  'no_warnings': True,
63
+ # LAYER 3: USER AGENT SPOOFING
64
+ 'user_agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/90.0.4430.212 Safari/537.36',
65
  }
66
 
 
 
67
  with yt_dlp.YoutubeDL(ydl_opts) as ydl:
68
  info = ydl.extract_info(url, download=True)
69
  filename = ydl.prepare_filename(info)
70
+ file_path = Path(filename)
71
+
72
+ # Fallback check if filename differs
73
+ if not file_path.exists():
74
+ potential_files = list(self.output_dir.glob("*"))
75
+ if not potential_files:
76
+ return Result(False, error="Download finished but file not found.")
77
+ file_path = max(potential_files, key=lambda x: x.stat().st_mtime)
 
78
 
79
+ return Result(True, file_path=str(file_path))
 
 
 
 
 
 
 
80
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
81
  except Exception as e:
82
+ logger.error(f"Download Error: {e}")
83
+ return Result(False, error=str(e))
84
 
85
+ class Processor:
86
+ def __init__(self):
87
+ self.tmp = Path(tempfile.mkdtemp())
88
+ self.downloader = MediaDownloader(self.tmp / "download")
89
+
90
+ # Lazy load whisper to save startup time
91
+ self.model = None
92
+
93
+ def load_model(self):
94
+ if not self.model:
95
  from faster_whisper import WhisperModel
96
+ self.model = WhisperModel("medium", device="cpu", compute_type="int8")
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
97
 
98
+ def run(self, url, upload, lang, progress=gr.Progress()):
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
99
  try:
100
+ # 1. Acquire Media
101
+ target_file = None
102
+ if upload:
103
+ target_file = Path(upload)
104
+ elif url:
105
+ res = self.downloader.download(url, progress)
106
+ if not res.success: return f"❌ Error: {res.error}", None, None
107
+ target_file = Path(res.file_path)
108
  else:
109
+ return "Please provide URL or File", None, None
110
+
111
+ # 2. Transcribe
112
+ progress(0.3, "Loading Model...")
113
+ self.load_model()
114
 
115
+ progress(0.5, "Transcribing...")
116
+ lang_code = lang.split("-")[0]
117
+ segments, _ = self.model.transcribe(str(target_file), language=lang_code, beam_size=5)
118
 
119
+ # Collect result
120
+ full_text = []
121
+ srt_content = []
122
+ for i, seg in enumerate(segments, 1):
123
+ full_text.append(seg.text)
124
+ # Simple SRT formatting
125
+ start = f"{int(seg.start//3600):02}:{int((seg.start%3600)//60):02}:{int(seg.start%60):02},000"
126
+ end = f"{int(seg.end//3600):02}:{int((seg.end%3600)//60):02}:{int(seg.end%60):02},000"
127
+ srt_content.append(f"{i}\n{start} --> {end}\n{seg.text.strip()}\n")
128
 
129
+ text_str = " ".join(full_text)
130
+ srt_str = "\n".join(srt_content)
 
131
 
132
+ # Save files
133
+ out_txt = self.tmp / "transcript.txt"
134
+ out_srt = self.tmp / "subs.srt"
135
+ out_txt.write_text(text_str, encoding="utf-8")
136
+ out_srt.write_text(srt_str, encoding="utf-8")
137
 
138
+ return f"✅ Done! ({len(text_str)} chars)", str(out_txt), str(out_srt)
 
 
 
 
 
139
 
140
  except Exception as e:
141
+ return f"❌ Critical Error: {str(e)}", None, None
142
 
143
+ # UI Setup
144
+ proc = Processor()
145
+
146
+ with gr.Blocks(title="Transcriber Pro") as demo:
147
+ gr.Markdown("## 🎙️ Media Transcriber Pro (IPv4 Fix)")
148
+
149
+ with gr.Row():
150
+ url_in = gr.Textbox(label="YouTube URL")
151
+ file_in = gr.File(label="Upload File")
152
+
153
+ lang_in = gr.Dropdown(["ar", "en"], value="ar", label="Language")
154
+ btn = gr.Button("Transcribe", variant="primary")
155
+
156
+ status = gr.Textbox(label="Status")
157
+ with gr.Row():
158
+ f1 = gr.File(label="TXT")
159
+ f2 = gr.File(label="SRT")
160
+
161
+ btn.click(proc.run, [url_in, file_in, lang_in], [status, f1, f2])
162
 
163
  if __name__ == "__main__":
164
+ demo.launch(server_name="0.0.0.0", server_port=7860)