File size: 6,065 Bytes
37e7f4a
 
3412e09
37e7f4a
3412e09
 
 
36aeb6d
3412e09
36aeb6d
3412e09
 
 
 
 
 
 
36aeb6d
37e7f4a
 
 
 
 
 
3412e09
37e7f4a
 
 
 
3412e09
37e7f4a
 
 
3412e09
 
37e7f4a
 
3412e09
37e7f4a
3412e09
 
37e7f4a
 
 
3412e09
 
37e7f4a
3412e09
 
37e7f4a
3412e09
37e7f4a
3412e09
36aeb6d
 
 
 
3412e09
 
 
36aeb6d
 
3412e09
 
36aeb6d
b007df5
36aeb6d
 
 
3412e09
 
 
 
 
 
 
 
36aeb6d
3412e09
37e7f4a
 
3412e09
 
37e7f4a
3412e09
 
 
 
 
 
 
 
 
 
37e7f4a
3412e09
37e7f4a
3412e09
37e7f4a
3412e09
 
 
 
 
 
 
 
37e7f4a
3412e09
 
 
 
 
37e7f4a
3412e09
 
 
49e6533
3412e09
 
 
 
 
 
 
 
 
49e6533
3412e09
 
37e7f4a
3412e09
 
 
 
 
37e7f4a
3412e09
36aeb6d
37e7f4a
3412e09
37e7f4a
3412e09
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
37e7f4a
 
3412e09
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
"""
MediaTranscriberPro - Hugging Face Space
Final Fix for DNS/IPv6 Issues
"""
# ---------------------------------------------------------
# LAYER 1: SYSTEM SOCKET PATCH (Must be at the very top)
# ---------------------------------------------------------
import socket
import os

# Force IPv4 for all socket connections
old_getaddrinfo = socket.getaddrinfo
def new_getaddrinfo(*args, **kwargs):
    responses = old_getaddrinfo(*args, **kwargs)
    return [response for response in responses if response[0] == socket.AF_INET]
socket.getaddrinfo = new_getaddrinfo
# ---------------------------------------------------------

import gradio as gr
import logging
import tempfile
import shutil
import subprocess
import re
import yt_dlp
from pathlib import Path
from dataclasses import dataclass
from typing import Optional, Callable

# Logging Setup
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)

# Constants
SUPPORTED_MEDIA = {".mp3", ".wav", ".m4a", ".aac", ".ogg", ".opus", ".flac", ".mp4", ".mkv", ".avi", ".mov", ".webm"}

@dataclass
class Result:
    success: bool
    data: Optional[str] = None
    file_path: Optional[str] = None
    error: Optional[str] = None

class MediaDownloader:
    def __init__(self, output_dir):
        self.output_dir = output_dir
        self.output_dir.mkdir(parents=True, exist_ok=True)

    def download(self, url, progress=None):
        try:
            if progress: progress(0.1, "Initializing download...")
            
            # LAYER 2: YT-DLP SPECIFIC OPTIONS
            ydl_opts = {
                'format': 'bestaudio/best',
                'outtmpl': str(self.output_dir / '%(title)s.%(ext)s'),
                'noplaylist': True,
                'force_ipv4': True,      # <--- يجبر المكتبة على استخدام IPv4
                'nocheckcertificate': True, # <--- يتجاوز أخطاء SSL
                'socket_timeout': 30,    # <--- يزيد وقت الانتظار
                'quiet': True,
                'no_warnings': True,
                # LAYER 3: USER AGENT SPOOFING
                'user_agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/90.0.4430.212 Safari/537.36',
            }

            with yt_dlp.YoutubeDL(ydl_opts) as ydl:
                info = ydl.extract_info(url, download=True)
                filename = ydl.prepare_filename(info)
                file_path = Path(filename)
                
            # Fallback check if filename differs
            if not file_path.exists():
                potential_files = list(self.output_dir.glob("*"))
                if not potential_files:
                    return Result(False, error="Download finished but file not found.")
                file_path = max(potential_files, key=lambda x: x.stat().st_mtime)

            return Result(True, file_path=str(file_path))

        except Exception as e:
            logger.error(f"Download Error: {e}")
            return Result(False, error=str(e))

class Processor:
    def __init__(self):
        self.tmp = Path(tempfile.mkdtemp())
        self.downloader = MediaDownloader(self.tmp / "download")
        
        # Lazy load whisper to save startup time
        self.model = None

    def load_model(self):
        if not self.model:
            from faster_whisper import WhisperModel
            self.model = WhisperModel("medium", device="cpu", compute_type="int8")

    def run(self, url, upload, lang, progress=gr.Progress()):
        try:
            # 1. Acquire Media
            target_file = None
            if upload:
                target_file = Path(upload)
            elif url:
                res = self.downloader.download(url, progress)
                if not res.success: return f"❌ Error: {res.error}", None, None
                target_file = Path(res.file_path)
            else:
                return "Please provide URL or File", None, None

            # 2. Transcribe
            progress(0.3, "Loading Model...")
            self.load_model()
            
            progress(0.5, "Transcribing...")
            lang_code = lang.split("-")[0]
            segments, _ = self.model.transcribe(str(target_file), language=lang_code, beam_size=5)
            
            # Collect result
            full_text = []
            srt_content = []
            for i, seg in enumerate(segments, 1):
                full_text.append(seg.text)
                # Simple SRT formatting
                start = f"{int(seg.start//3600):02}:{int((seg.start%3600)//60):02}:{int(seg.start%60):02},000"
                end = f"{int(seg.end//3600):02}:{int((seg.end%3600)//60):02}:{int(seg.end%60):02},000"
                srt_content.append(f"{i}\n{start} --> {end}\n{seg.text.strip()}\n")
            
            text_str = " ".join(full_text)
            srt_str = "\n".join(srt_content)
            
            # Save files
            out_txt = self.tmp / "transcript.txt"
            out_srt = self.tmp / "subs.srt"
            out_txt.write_text(text_str, encoding="utf-8")
            out_srt.write_text(srt_str, encoding="utf-8")
            
            return f"✅ Done! ({len(text_str)} chars)", str(out_txt), str(out_srt)

        except Exception as e:
            return f"❌ Critical Error: {str(e)}", None, None

# UI Setup
proc = Processor()

with gr.Blocks(title="Transcriber Pro") as demo:
    gr.Markdown("## 🎙️ Media Transcriber Pro (IPv4 Fix)")
    
    with gr.Row():
        url_in = gr.Textbox(label="YouTube URL")
        file_in = gr.File(label="Upload File")
    
    lang_in = gr.Dropdown(["ar", "en"], value="ar", label="Language")
    btn = gr.Button("Transcribe", variant="primary")
    
    status = gr.Textbox(label="Status")
    with gr.Row():
        f1 = gr.File(label="TXT")
        f2 = gr.File(label="SRT")
    
    btn.click(proc.run, [url_in, file_in, lang_in], [status, f1, f2])

if __name__ == "__main__":
    demo.launch(server_name="0.0.0.0", server_port=7860)