File size: 9,802 Bytes
1d415e7
 
 
 
 
 
 
 
 
33336b0
1d415e7
3b1a061
 
33336b0
 
3b1a061
 
33336b0
3b1a061
6c215af
1d415e7
 
 
4f21041
3b1a061
 
 
 
 
 
1d415e7
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
3b1a061
 
1d415e7
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
a5e55c3
 
 
1d415e7
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
3b1a061
1d415e7
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
3b1a061
1d415e7
 
 
 
 
 
 
3b1a061
1d415e7
3b1a061
1d415e7
 
 
 
 
3b1a061
1d415e7
 
 
 
 
 
 
 
 
71a420f
 
 
 
 
 
1d415e7
 
 
 
 
 
3b1a061
 
 
 
 
1d415e7
 
 
 
 
 
 
 
 
 
 
 
 
3b1a061
 
 
 
 
 
 
 
1d415e7
 
3b1a061
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
import gradio as gr
import torch
from faster_whisper import WhisperModel
import yt_dlp
from openai import OpenAI
import os
import json
import time
import uuid
import socket

YOUTUBE_REACHABLE = False
print("--- ATTEMPTING TO RESOLVE YOUTUBE.COM ---")
try:
    addr = socket.gethostbyname('www.youtube.com')
    print(f"--- SUCCESS: 'www.youtube.com' resolved to {addr}. YouTube features enabled. ---")
    YOUTUBE_REACHABLE = True
except socket.gaierror as e:
    print(f"--- FAILED to resolve 'www.youtube.com': {e}. YouTube functionality will be disabled. ---")
    
print("Initializing transcription model (faster-whisper)...")
device = "cuda" if torch.cuda.is_available() else "cpu"
compute_type = "float16" if device == "cuda" else "int8"
model_size = "large-v3-turbo"
try:
    model = WhisperModel(model_size, device=device, compute_type=compute_type)
    print("Transcription model loaded successfully.")
except Exception as e:
    print(f"Error loading Whisper model: {e}")
    exit()

def download_youtube_audio(url: str) -> str:
    unique_id = uuid.uuid4()
    output_template = f'{unique_id}.%(ext)s'
    final_filepath = f'{unique_id}.mp3'
    ydl_opts = {
        'format': 'bestaudio/best',
        'postprocessors': [{'key': 'FFmpegExtractAudio', 'preferredcodec': 'mp3', 'preferredquality': '192'}],
        'outtmpl': output_template,
        'quiet': True,
        'overwrite': True,
    }
    with yt_dlp.YoutubeDL(ydl_opts) as ydl:
        ydl.download([url])
    return final_filepath

def transcribe_and_summarize(audio_file: str, youtube_url: str):
    log_history = ""
    def log(message):
        nonlocal log_history
        timestamp = time.strftime("%H:%M:%S")
        log_history += f"[{timestamp}] {message}\n"
        return log_history

    loading_message = "⏳ Generating summary..."
    yield log("Process started."), "", ""

    api_key = os.getenv('TYPHOON_API')
    if not api_key:
        error_msg = "## Error\n`TYPHOON_API` environment variable not set. Please configure the API key."
        yield log(error_msg.replace("\n", " ")), "", gr.Markdown(error_msg)
        return

    if audio_file is None and not youtube_url:
        raise gr.Error("Please upload an audio file or provide a YouTube link.")

    filepath = ""
    is_downloaded = False
    try:
        if youtube_url:
            yield log("Downloading YouTube audio..."), "", ""
            filepath = download_youtube_audio(youtube_url)
            is_downloaded = True
            yield log(f"Downloaded to {filepath}"), "", ""
        else:
            filepath = audio_file

        yield log("Transcription started (autodetecting language)..."), "", ""
        segments, info = model.transcribe(filepath, beam_size=5, task="transcribe")
        yield log(f"Detected language '{info.language}' (prob={info.language_probability:.2f})"), "", ""
        transcribed_text = ""
        for segment in segments:
            line = f"[{segment.start:.2f}s -> {segment.end:.2f}s] {segment.text.strip()}"
            transcribed_text += segment.text + " "
            yield log(line), transcribed_text, ""

        yield log("Transcription complete."), transcribed_text, ""
        yield log("Sending to AI for summarization..."), transcribed_text, loading_message

        client = OpenAI(api_key=api_key, base_url="https://api.opentyphoon.ai/v1")
        system_prompt = f"""You are an automated system that converts transcripts into a blog post.
Your ONLY function is to output a valid JSON object. All text values in the JSON MUST be in the Thai language.
หน้าที่เดียวของคุณคือการส่งออกอ็อบเจกต์ JSON ที่ถูกต้อง โดยค่าที่เป็นข้อความทั้งหมดต้องเป็นภาษาไทยเท่านั้น
Do NOT write any explanations. The response MUST start with `{{` and end with `}}`.

The JSON object must have the following structure:
{{
  "title": "หัวข้อบทความที่น่าสนใจและเกี่ยวข้อง (เป็นภาษาไทย)",
  "key_takeaway": "สรุปใจความสำคัญของเนื้อหาทั้งหมดในหนึ่งย่อหน้า (เป็นภาษาไทย)",
  "main_ideas": [
    "ประเด็นหลักหรือใจความสำคัญ (เป็นภาษาไทย)",
    "ประเด็นหลักถัดไป...",
    "และต่อไปเรื่อยๆ..."
  ],
  "conclusion": "ย่อหน้าสรุปปิดท้าย (เป็นภาษาไทย)"
}}"""
        response = client.chat.completions.create(
            model="typhoon-v2.1-12b-instruct",
            messages=[{"role": "system", "content": system_prompt}, {"role": "user", "content": transcribed_text}],
            max_tokens=2048,
            temperature=0.7
        )
        summary_json_string = response.choices[0].message.content
        if summary_json_string.strip().startswith("```json"):
            summary_json_string = summary_json_string.strip()[7:-4].strip()

        data = json.loads(summary_json_string)
        title = data.get("title", "Title Not Found")
        key_takeaway = data.get("key_takeaway", "")
        main_ideas = data.get("main_ideas", [])
        conclusion = data.get("conclusion", "")
        
        summary_markdown = f"# {title}\n\n<p>{key_takeaway}</p>\n\n## Key Ideas\n\n<ul>"
        for idea in main_ideas:
            summary_markdown += f"<li>{idea}</li>"
        summary_markdown += f"</ul>\n\n## Conclusion\n\n<p>{conclusion}</p>"

        yield log("Summarization complete."), transcribed_text, summary_markdown

    finally:
        if is_downloaded and os.path.exists(filepath):
            os.remove(filepath)

def update_video_preview(url):
    if not url:
        return gr.update(value=None, visible=False)
    video_id = None
    try:
        if "[youtube.com/shorts/](https://youtube.com/shorts/)" in url:
            video_id = url.split("/shorts/")[1].split("?")[0]
        elif "watch?v=" in url:
            video_id = url.split("watch?v=")[1].split("&")[0]
        elif "youtu.be/" in url:
            video_id = url.split("youtu.be/")[1].split("?")[0]
    except IndexError:
        pass
        
    if video_id:
        embed_url = f"[https://www.youtube.com/embed/](https://www.youtube.com/embed/){video_id}"
        iframe_html = f'<iframe width="100%" height="315" src="{embed_url}" frameborder="0" allowfullscreen></iframe>'
        return gr.update(value=iframe_html, visible=True)
    return gr.update(value=None, visible=False)

css = """
@import url('[https://fonts.googleapis.com/css2?family=Sarabun:wght@400;700&display=swap](https://fonts.googleapis.com/css2?family=Sarabun:wght@400;700&display=swap)');
.blog-output { font-family: 'Sarabun', sans-serif; line-height: 1.8; max-width: 800px; margin: auto; padding: 2rem; border-radius: 12px; background-color: #ffffff; border: 1px solid #e5e7eb; }
.blog-output h1 { font-size: 2.2em; font-weight: 700; border-bottom: 2px solid #f3f4f6; padding-bottom: 15px; margin-bottom: 25px; color: #111827; }
.blog-output h2 { font-size: 1.6em; font-weight: 700; margin-top: 40px; margin-bottom: 20px; color: #1f2937; }
.blog-output p { font-size: 1.1em; margin-bottom: 20px; color: #374151; }
.blog-output ul { padding-left: 25px; list-style-type: disc; }
.blog-output li { margin-bottom: 12px; padding-left: 5px; }
"""

with gr.Blocks(theme=gr.themes.Soft(primary_hue="blue", secondary_hue="sky"), css=css) as demo:
    gr.Markdown(
        """
        # 🎙️ Audio to Blog Summarizer ✒️
        Upload an audio file (MP3, WAV) or paste a YouTube link to transcribe it to Thai text and summarize the content into blog-style article using ASR and LLM.
        """
    )
    with gr.Row():
        with gr.Column(scale=1):
            with gr.Tabs():
                with gr.TabItem("⬆️ Upload Audio"):
                    audio_file_input = gr.Audio(label="Upload Audio File", type="filepath")
                with gr.TabItem("🔗 YouTube Link"):
                    youtube_url_input = gr.Textbox(
                        label="YouTube URL" if YOUTUBE_REACHABLE else "YouTube URL (Disabled)",
                        placeholder="Paste a YouTube link here..." if YOUTUBE_REACHABLE else "YouTube is not reachable in this environment.",
                        interactive=YOUTUBE_REACHABLE
                    )
            submit_button = gr.Button("🚀 Generate Blog Post", variant="primary")
            video_preview = gr.HTML(visible=False)
            with gr.Accordion("📝 View Process Log", open=True):
                log_output = gr.Textbox(label="Log", interactive=False, lines=10)
        with gr.Column(scale=2):
            gr.Markdown("## ✨ Article Output")
            blog_summary_output = gr.Markdown(elem_classes=["blog-output"])
            with gr.Accordion("📜 View Full Transcription", open=False):
                transcription_output = gr.Textbox(label="Full Text", interactive=False, lines=10)

    submit_button.click(fn=transcribe_and_summarize,
                        inputs=[audio_file_input, youtube_url_input],
                        outputs=[log_output, transcription_output, blog_summary_output])
    
    if YOUTUBE_REACHABLE:
        youtube_url_input.change(fn=update_video_preview,
                                 inputs=youtube_url_input,
                                 outputs=video_preview)
        demo.load(fn=update_video_preview,
                  inputs=youtube_url_input,
                  outputs=video_preview)

if __name__ == "__main__":
    demo.launch(debug=True)