| | |
| |
|
| | from typing import Container |
| | from config.config import PASSWORD |
| | import gradio as gr |
| | import os |
| | import shutil |
| | import tempfile |
| | from google import genai |
| | from google.genai import types |
| | import yt_dlp |
| |
|
| | from initializer import initialize_clients, initialize_password |
| |
|
| | |
| | GCS_SERVICE, GENAI_CLIENT = initialize_clients() |
| | GCS_CLIENT = GCS_SERVICE.client |
| |
|
| | |
| | PASSWORD = initialize_password() |
| | def process_with_auth(password, file_list, file_display): |
| | """帶密碼驗證的文件處理""" |
| | if not file_display: |
| | return "請選擇要處理的文件", "", gr.update(visible=False) |
| | |
| | if password != PASSWORD: |
| | return "請輸入正確的密碼", "", gr.update(visible=False) |
| | |
| | |
| | selected_files = [] |
| | for item in file_list: |
| | if "|||" in item: |
| | title = item.split("|||")[0] |
| | if title in file_display: |
| | selected_files.append(item) |
| | else: |
| | if os.path.basename(item) in file_display: |
| | selected_files.append(item) |
| | |
| | result_text, transcript_text = process_all_files(selected_files) |
| | return result_text, transcript_text, gr.update(visible=True) |
| |
|
| | |
| | def toggle_visibility(toggle_value): |
| | return gr.update(visible=toggle_value) |
| |
|
| | |
| | def add_youtube_to_list(youtube_link, file_list): |
| | if not youtube_link: |
| | return gr.update(choices=[item.split("|||")[0] if "|||" in item else os.path.basename(item) for item in file_list]), "" |
| | |
| | |
| | title = get_youtube_title(youtube_link) |
| | |
| | |
| | if not youtube_link.startswith('http'): |
| | if 'watch?v=' in youtube_link: |
| | youtube_link = f'https://www.youtube.com/{youtube_link}' |
| | else: |
| | youtube_link = f'https://www.youtube.com/watch?v={youtube_link}' |
| | |
| | |
| | file_list.append(f"{title}|||{youtube_link}") |
| | display_list = [item.split("|||")[0] if "|||" in item else os.path.basename(item) for item in file_list] |
| | print(f"File list: {file_list}") |
| | print(f"Display list: {display_list}") |
| | return file_list, "" |
| |
|
| | def get_youtube_title_from_gemini(url): |
| | """使用 Gemini 獲取 YouTube 標題""" |
| | print(f"\n開始獲取 YouTube 標題: {url}") |
| | try: |
| | print("初始化 Gemini 模型設定...") |
| | video = types.Part.from_uri( |
| | file_uri=url, |
| | mime_type="video/*", |
| | ) |
| |
|
| | print("開始生成標題...") |
| | response = GENAI_CLIENT.models.generate_content( |
| | model="gemini-2.0-flash-exp", |
| | contents=[ |
| | types.Content( |
| | role="user", |
| | parts=[ |
| | video, |
| | types.Part.from_text("請只回傳這個影片的標題,不要加入其他任何文字。") |
| | ] |
| | ) |
| | ] |
| | ) |
| | |
| | if response and response.text: |
| | title = response.text.strip() |
| | print(f"成功獲取標題: {title}") |
| | return title |
| | return url |
| | except Exception as e: |
| | print(f"Gemini 獲取標題失敗: {str(e)}") |
| | return url |
| |
|
| | def get_youtube_title(url): |
| | """獲取 YouTube 影片標題""" |
| | try: |
| | |
| | if not url.startswith('http'): |
| | if 'watch?v=' in url: |
| | url = f'https://www.youtube.com/{url}' |
| | else: |
| | url = f'https://www.youtube.com/watch?v={url}' |
| | |
| | |
| | try: |
| | ydl_opts = { |
| | 'quiet': True, |
| | 'no_warnings': True, |
| | 'extract_flat': True |
| | } |
| | with yt_dlp.YoutubeDL(ydl_opts) as ydl: |
| | info = ydl.extract_info(url, download=False) |
| | title = info.get('title', '') |
| | if title: |
| | print(f"YouTube title from yt-dlp: {title}") |
| | return title |
| | except Exception as e: |
| | print(f"yt-dlp 獲取標題失敗: {str(e)}") |
| | |
| | |
| | print("嘗試使用 Gemini 獲取標題...") |
| | title = get_youtube_title_from_gemini(url) |
| | if title and title != url: |
| | print(f"YouTube title from Gemini: {title}") |
| | return title |
| | |
| | return url |
| | except Exception as e: |
| | print(f"獲取標題失敗: {str(e)}") |
| | return url |
| |
|
| | |
| | def add_to_file_list(file, file_list): |
| | if file: |
| | temp_dir = tempfile.gettempdir() |
| | temp_path = os.path.join(temp_dir, os.path.basename(file.name)) |
| | shutil.copy(file.name, temp_path) |
| | file_list.append(temp_path) |
| | display_list = [os.path.basename(path) if os.path.basename(path) else path for path in file_list] |
| | return gr.update(choices=display_list), None |
| |
|
| | |
| | def process_all_files(file_list): |
| | """處理所有選中的文件""" |
| | if not file_list: |
| | return "請選擇要處理的文件", "" |
| |
|
| | all_text = [] |
| | status_messages = [] |
| | |
| | for item in file_list: |
| | try: |
| | if "|||" in item: |
| | |
| | title, url = item.split("|||") |
| | print(f"處理 YouTube: {title}") |
| | try: |
| | transcript = generate_transcript(url) |
| | if transcript: |
| | all_text.append(f"=== {title} ===\n{transcript}") |
| | status_messages.append(f"🟢 成功處理 YouTube 影片:{title}") |
| | else: |
| | status_messages.append(f"🔴 無法獲取影片逐字稿:{title}") |
| | except Exception as e: |
| | if "無法取得影片資訊" in str(e): |
| | |
| | all_text.append(f"=== YouTube 影片 ===\n{e.transcript if hasattr(e, 'transcript') else ''}") |
| | status_messages.append(f"🟡 影片資訊不完整,但已處理內容:{url}") |
| | else: |
| | status_messages.append(f"🔴 處理失敗:{title}({str(e)})") |
| | else: |
| | |
| | filename = os.path.basename(item) |
| | print(f"處理文件: {filename}") |
| | try: |
| | with open(item, 'r', encoding='utf-8') as f: |
| | content = f.read() |
| | try: |
| | |
| | decoded_name = filename.encode('latin1').decode('utf-8') |
| | all_text.append(f"=== {decoded_name} ===\n{content}") |
| | status_messages.append(f"🟢 成功處理文件:{decoded_name}") |
| | except: |
| | |
| | all_text.append(f"=== 文件內容 ===\n{content}") |
| | status_messages.append(f"🟡 文件名稱無法正確顯示,但已處理內容:{filename}") |
| | except UnicodeDecodeError: |
| | try: |
| | |
| | for encoding in ['big5', 'gbk', 'shift-jis']: |
| | try: |
| | with open(item, 'r', encoding=encoding) as f: |
| | content = f.read() |
| | all_text.append(f"=== {filename} ===\n{content}") |
| | status_messages.append(f"🟡 使用 {encoding} 編碼成功讀取文件:{filename}") |
| | break |
| | except: |
| | continue |
| | else: |
| | status_messages.append(f"🔴 無法讀取文件內容:{filename}(編碼問題)") |
| | except Exception as e: |
| | status_messages.append(f"🔴 讀取文件失敗:{filename}({str(e)})") |
| | except Exception as e: |
| | status_messages.append(f"🔴 讀取文件失敗:{filename}({str(e)})") |
| | except Exception as e: |
| | status_messages.append(f"🔴 處理失敗:{item}({str(e)})") |
| |
|
| | if not all_text: |
| | return "❌ 沒有成功處理任何文件", "" |
| |
|
| | |
| | combined_text = "\n\n".join(all_text) |
| | status_text = "\n".join(status_messages) |
| | |
| | return f"處理完成\n{status_text}", combined_text |
| |
|
| | |
| | def mock_question_answer(question, history): |
| | |
| | answers = { |
| | "文件的核心觀點是什麼?": "這份文件的核心觀點是關於人工智慧如何提升工作效率。", |
| | "有哪些關鍵詞或數據?": "關鍵詞包括:人工智慧、工作效率、數據分析。", |
| | "文件的摘要是什麼?": "這份文件討論了如何利用人工智慧工具,提升企業的運營效率和決策速度。" |
| | } |
| | response = answers.get(question, "抱歉,我無法回答這個問題。請嘗試其他問題!") |
| | history.append({"role": "user", "content": question}) |
| | history.append({"role": "assistant", "content": response}) |
| | return history, "" |
| |
|
| |
|
| | |
| | def generate_transcript(youtube_link): |
| | print(f"\n開始生成 YouTube 逐字稿: {youtube_link}") |
| | try: |
| | print("初始化 Gemini 模型設定...") |
| | video = types.Part.from_uri( |
| | file_uri=youtube_link, |
| | mime_type="video/*", |
| | ) |
| |
|
| | model = "gemini-2.0-flash-exp" |
| | contents = [ |
| | types.Content( |
| | role="user", |
| | parts=[ |
| | video, |
| | types.Part.from_text("""請給我帶時間軸的逐字稿,請統一用 zhTW語言""") |
| | ] |
| | ) |
| | ] |
| | generate_content_config = types.GenerateContentConfig( |
| | temperature=1, |
| | top_p=0.95, |
| | max_output_tokens=8192, |
| | response_modalities=["TEXT"], |
| | safety_settings=[ |
| | types.SafetySetting(category="HARM_CATEGORY_HATE_SPEECH", threshold="OFF"), |
| | types.SafetySetting(category="HARM_CATEGORY_DANGEROUS_CONTENT", threshold="OFF"), |
| | types.SafetySetting(category="HARM_CATEGORY_SEXUALLY_EXPLICIT", threshold="OFF"), |
| | types.SafetySetting(category="HARM_CATEGORY_HARASSMENT", threshold="OFF") |
| | ], |
| | ) |
| |
|
| | print("開始串流生成逐字稿...") |
| | transcript_text = "" |
| | for chunk in GENAI_CLIENT.models.generate_content_stream( |
| | model=model, |
| | contents=contents, |
| | config=generate_content_config, |
| | ): |
| | |
| | if hasattr(chunk, 'candidates') and chunk.candidates: |
| | for candidate in chunk.candidates: |
| | if (hasattr(candidate, 'content') and |
| | hasattr(candidate.content, 'parts')): |
| | for part in candidate.content.parts: |
| | if hasattr(part, 'text') and part.text: |
| | transcript_text += part.text |
| | print(".", end="", flush=True) |
| | |
| | print("\n逐字稿生成完成!") |
| | return transcript_text |
| | except Exception as e: |
| | print(f"\n生成逐字稿時發生錯誤: {str(e)}") |
| | raise |
| |
|
| | def generate_summary(transcript): |
| | """Generate a summary from the transcript using Gemini.""" |
| | try: |
| | print("\n開始生成摘要...") |
| | model = "gemini-2.0-flash-exp" |
| | prompt = f""" |
| | Inputs: |
| | - 請根據以下逐字稿或文本生成重點摘要:{transcript} |
| | |
| | Rules: |
| | - 如果有課程名稱,請圍繞「課程名稱」為學習重點,進行重點整理,不要整理跟情境故事相關的問題 |
| | - 整體摘要在一百字以內 |
| | - 重點概念列出 bullet points,至少三個,最多五個 |
| | - 以及可能的結論與結尾延伸小問題提供學生作反思 |
| | - 敘述中,請把數學或是專業術語,用 Latex 包覆($...$) |
| | - 加減乘除、根號、次方等等的運算式口語也換成 LATEX 數學符號 |
| | |
| | Example: |
| | 請以下列 markdown 格式輸出: |
| | ## 🌟 主題: (如果沒有 title 就省略) |
| | ## 📚 整體摘要 |
| | - (一個 bullet point....) |
| | |
| | ## 🔖 重點概念 |
| | - xxx |
| | - xxx |
| | - xxx |
| | |
| | ## 💡 為什麼我們要學這個? |
| | - (一個 bullet point....) |
| | |
| | ## ❓ 延伸小問題 |
| | - (一個 bullet point....請圍繞學習重點,進行重點延伸思考,不要整理跟情境故事相關的問題) |
| | """ |
| | contents = [ |
| | types.Content( |
| | role="user", |
| | parts=[ |
| | types.Part.from_text(prompt) |
| | ] |
| | ) |
| | ] |
| | |
| | response = GENAI_CLIENT.models.generate_content( |
| | model=model, |
| | contents=contents, |
| | ) |
| | |
| | print("摘要生成完成!") |
| | summary = response.text |
| | return summary |
| | except Exception as e: |
| | print(f"\n生成摘要時發生錯誤: {str(e)}") |
| | raise |
| |
|
| | def on_summary_click(transcript): |
| | if not transcript: |
| | return "請先上傳文件或輸入 YouTube 連結並處理完成後再生成摘要。" |
| | |
| | summary = generate_summary(transcript) |
| | return summary |
| |
|
| | with gr.Blocks() as demo: |
| |
|
| | with gr.Row(): |
| | gr.Markdown("# AI Notes Assistant") |
| | password_input = gr.Textbox(label="password") |
| |
|
| | with gr.Row(): |
| | source_toggle = gr.Checkbox(label="顯示來源選單", value=True) |
| | chat_toggle = gr.Checkbox(label="顯示對話區域", value=True) |
| | feature_toggle = gr.Checkbox(label="顯示功能卡片", value=True) |
| |
|
| | with gr.Row(): |
| | with gr.Column(visible=True) as source_column: |
| | gr.Markdown("### 來源選單") |
| | |
| | file_list = gr.State([]) |
| | file_display = gr.State([]) |
| | |
| | with gr.Tab("YouTube 連結"): |
| | youtube_link = gr.Textbox(label="輸入 YouTube 連結") |
| | add_youtube_button = gr.Button("添加到來源列表") |
| | add_youtube_button.click(add_youtube_to_list, inputs=[youtube_link, file_list], outputs=[file_list, youtube_link]) |
| |
|
| | with gr.Tab("上傳檔案(TODO)"): |
| | upload_file = gr.File(label="從電腦添加文件", file_types=[".txt", ".pdf", ".docx"]) |
| | add_file_button = gr.Button("添加到來源列表") |
| | add_file_button.click(add_to_file_list, inputs=[upload_file, file_list], outputs=[file_list, upload_file]) |
| | |
| | file_display_input = gr.CheckboxGroup(label="已上傳的文件", interactive=True) |
| |
|
| | |
| | def update_display(file_list): |
| | display_list = [item.split("|||")[0] if "|||" in item else os.path.basename(item) for item in file_list] |
| | print(f"Updating display with: {display_list}") |
| | return gr.update(choices=display_list, value=[]) |
| | |
| | file_list.change(update_display, inputs=file_list, outputs=file_display_input) |
| |
|
| | process_files_button = gr.Button("處理檔案") |
| | rag_result = gr.Textbox(label="處理狀態", interactive=False) |
| | |
| | with gr.Column(visible=True) as chat_column: |
| | gr.Markdown("### 對話區域") |
| | chatbot = gr.Chatbot(label="聊天記錄", type="messages") |
| | question = gr.Textbox(label="輸入問題,例如:文件的核心觀點是什麼?") |
| | ask_button = gr.Button("提問") |
| |
|
| | with gr.Column(visible=True) as feature_column: |
| | gr.Markdown("### 功能卡片") |
| | with gr.Tab("摘要生成"): |
| | summary_button = gr.Button("生成摘要", visible=False) |
| | summary_output = gr.Markdown( |
| | label="摘要", |
| | show_label=True, |
| | show_copy_button=True, |
| | container=True |
| | ) |
| | with gr.Tab("逐字稿"): |
| | transcript_display = gr.Textbox( |
| | label="YouTube 逐字稿", |
| | interactive=False, |
| | lines=20, |
| | show_copy_button=True, |
| | placeholder="處理 YouTube 影片後,逐字稿將顯示在這裡..." |
| | ) |
| | with gr.Tab("其他功能"): |
| | gr.Markdown("此處可以添加更多功能卡片") |
| |
|
| | source_toggle.change(toggle_visibility, inputs=source_toggle, outputs=source_column) |
| | chat_toggle.change(toggle_visibility, inputs=chat_toggle, outputs=chat_column) |
| | feature_toggle.change(toggle_visibility, inputs=feature_toggle, outputs=feature_column) |
| |
|
| | |
| | process_files_button.click( |
| | fn=process_with_auth, |
| | inputs=[password_input, file_list, file_display_input], |
| | outputs=[ |
| | rag_result, |
| | transcript_display, |
| | summary_button |
| | ] |
| | ).then( |
| | fn=on_summary_click, |
| | inputs=[transcript_display], |
| | outputs=[summary_output] |
| | ) |
| |
|
| | history = gr.State([]) |
| | ask_button.click(mock_question_answer, inputs=[question, history], outputs=[chatbot, question]) |
| | summary_button.click( |
| | fn=on_summary_click, |
| | inputs=[transcript_display], |
| | outputs=[summary_output] |
| | ) |
| |
|
| | demo.launch(share=True) |