""" NotebookLM-Py Gradio Web UI 基于 notebooklm-py 的 Web 界面 """ import asyncio import os import gradio as gr from notebooklm import NotebookLMClient from contextlib import asynccontextmanager # 全局状态 _current_notebook_id = None @asynccontextmanager async def get_client(): """获取 NotebookLM 客户端(使用 async with 上下文)""" async with await NotebookLMClient.from_storage() as client: yield client # ==================== 笔记本操作 ==================== async def list_notebooks(): """列出所有笔记本""" try: async with get_client() as client: notebooks = await client.notebooks.list() if not notebooks: return "📭 没有找到笔记本", [] # 格式化列表 result = "📚 **笔记本列表**\n\n" choices = [] for nb in notebooks: result += f"- **{nb.title}** (`{nb.id}`)\n" choices.append((nb.title, nb.id)) return result, choices except Exception as e: return f"❌ 错误: {str(e)}", [] async def create_notebook(title: str): """创建新笔记本""" global _current_notebook_id if not title.strip(): return "⚠️ 请输入笔记本标题", None try: async with get_client() as client: nb = await client.notebooks.create(title.strip()) _current_notebook_id = nb.id return f"✅ 创建成功: **{nb.title}** (`{nb.id}`)", nb.id except Exception as e: return f"❌ 错误: {str(e)}", None async def select_notebook(notebook_id: str): """选择当前笔记本""" global _current_notebook_id if not notebook_id: return "⚠️ 请选择笔记本" _current_notebook_id = notebook_id return f"✅ 已选择笔记本: `{notebook_id}`" # ==================== 资源操作 ==================== async def add_url_source(url: str): """添加 URL 资源""" global _current_notebook_id if not _current_notebook_id: return "⚠️ 请先选择笔记本" if not url.strip(): return "⚠️ 请输入 URL" try: async with get_client() as client: await client.sources.add_url(_current_notebook_id, url.strip(), wait=True) return f"✅ 已添加 URL: {url}" except Exception as e: return f"❌ 错误: {str(e)}" async def list_sources(): """列出当前笔记本的资源""" global _current_notebook_id if not _current_notebook_id: return "⚠️ 请先选择笔记本" try: async with get_client() as client: sources = await client.sources.list(_current_notebook_id) if not sources: return "📭 没有资源" result = "📎 **资源列表**\n\n" for src in sources: result += f"- {src.title} ({src.source_type})\n" return result except Exception as e: return f"❌ 错误: {str(e)}" # ==================== 对话操作 ==================== async def chat_with_sources(question: str, history: list): """与资源对话""" global _current_notebook_id if not _current_notebook_id: return history + [{"role": "assistant", "content": "⚠️ 请先选择笔记本"}] if not question.strip(): return history try: async with get_client() as client: result = await client.chat.ask(_current_notebook_id, question.strip()) new_history = history + [ {"role": "user", "content": question}, {"role": "assistant", "content": result.answer} ] return new_history except Exception as e: return history + [ {"role": "user", "content": question}, {"role": "assistant", "content": f"❌ 错误: {str(e)}"} ] # ==================== 内容生成 ==================== async def generate_audio(instructions: str): """生成音频播客""" global _current_notebook_id if not _current_notebook_id: return "⚠️ 请先选择笔记本", None try: async with get_client() as client: status = await client.artifacts.generate_audio( _current_notebook_id, instructions=instructions if instructions.strip() else None ) await client.artifacts.wait_for_completion(_current_notebook_id, status.task_id) # 下载音频 output_path = "/tmp/podcast.mp3" await client.artifacts.download_audio(_current_notebook_id, output_path) return "✅ 音频生成成功!", output_path except Exception as e: return f"❌ 错误: {str(e)}", None async def generate_quiz(difficulty: str): """生成测验""" global _current_notebook_id if not _current_notebook_id: return "⚠️ 请先选择笔记本" try: async with get_client() as client: status = await client.artifacts.generate_quiz( _current_notebook_id, difficulty=difficulty if difficulty else None ) await client.artifacts.wait_for_completion(_current_notebook_id, status.task_id) # 下载测验 output_path = "/tmp/quiz.md" await client.artifacts.download_quiz(_current_notebook_id, output_path, output_format="markdown") with open(output_path, "r", encoding="utf-8") as f: content = f.read() return f"✅ 测验生成成功!\n\n{content}" except Exception as e: return f"❌ 错误: {str(e)}" async def generate_flashcards(): """生成闪卡""" global _current_notebook_id if not _current_notebook_id: return "⚠️ 请先选择笔记本" try: async with get_client() as client: status = await client.artifacts.generate_flashcards(_current_notebook_id) await client.artifacts.wait_for_completion(_current_notebook_id, status.task_id) # 下载闪卡 output_path = "/tmp/flashcards.md" await client.artifacts.download_flashcards(_current_notebook_id, output_path, output_format="markdown") with open(output_path, "r", encoding="utf-8") as f: content = f.read() return f"✅ 闪卡生成成功!\n\n{content}" except Exception as e: return f"❌ 错误: {str(e)}" # ==================== Gradio UI ==================== def create_ui(): """创建 Gradio 界面""" with gr.Blocks(title="NotebookLM-Py") as demo: gr.Markdown( """ # 📓 NotebookLM-Py Web UI 基于 [notebooklm-py](https://github.com/teng-lin/notebooklm-py) 的 Web 界面, 让你可以通过浏览器使用 Google NotebookLM 的全部功能。 """, elem_classes=["main-title"] ) with gr.Tabs(): # ===== Tab 1: 笔记本管理 ===== with gr.TabItem("📚 笔记本"): with gr.Row(): with gr.Column(): gr.Markdown("### 创建新笔记本") new_title = gr.Textbox(label="标题", placeholder="输入笔记本标题...") create_btn = gr.Button("创建", variant="primary") create_result = gr.Markdown() with gr.Column(): gr.Markdown("### 选择笔记本") refresh_btn = gr.Button("🔄 刷新列表") notebook_list = gr.Markdown() notebook_dropdown = gr.Dropdown(label="选择", choices=[]) select_btn = gr.Button("选择此笔记本", variant="secondary") select_result = gr.Markdown() # 事件绑定 create_btn.click( lambda t: asyncio.run(create_notebook(t)), inputs=[new_title], outputs=[create_result, notebook_dropdown] ) refresh_btn.click( lambda: asyncio.run(list_notebooks()), outputs=[notebook_list, notebook_dropdown] ) select_btn.click( lambda n: asyncio.run(select_notebook(n)), inputs=[notebook_dropdown], outputs=[select_result] ) # ===== Tab 2: 资源管理 ===== with gr.TabItem("📎 资源"): with gr.Row(): with gr.Column(): gr.Markdown("### 添加 URL 资源") url_input = gr.Textbox(label="URL", placeholder="https://...") add_url_btn = gr.Button("添加 URL", variant="primary") add_result = gr.Markdown() with gr.Column(): gr.Markdown("### 当前资源") list_sources_btn = gr.Button("🔄 刷新资源列表") sources_list = gr.Markdown() add_url_btn.click( lambda u: asyncio.run(add_url_source(u)), inputs=[url_input], outputs=[add_result] ) list_sources_btn.click( lambda: asyncio.run(list_sources()), outputs=[sources_list] ) # ===== Tab 3: 对话 ===== with gr.TabItem("💬 对话"): chatbot = gr.Chatbot(label="与资源对话", height=400) with gr.Row(): chat_input = gr.Textbox( label="提问", placeholder="输入你的问题...", scale=4 ) chat_btn = gr.Button("发送", variant="primary", scale=1) chat_btn.click( lambda q, h: asyncio.run(chat_with_sources(q, h)), inputs=[chat_input, chatbot], outputs=[chatbot] ) chat_input.submit( lambda q, h: asyncio.run(chat_with_sources(q, h)), inputs=[chat_input, chatbot], outputs=[chatbot] ) # ===== Tab 4: 内容生成 ===== with gr.TabItem("🎙️ 生成"): gr.Markdown("### 生成内容") with gr.Accordion("🎧 音频播客", open=True): audio_instructions = gr.Textbox( label="指令(可选)", placeholder="例如:让它更有趣..." ) audio_btn = gr.Button("生成音频", variant="primary") audio_status = gr.Markdown() audio_output = gr.Audio(label="生成的音频") audio_btn.click( lambda i: asyncio.run(generate_audio(i)), inputs=[audio_instructions], outputs=[audio_status, audio_output] ) with gr.Accordion("📝 测验", open=False): quiz_difficulty = gr.Dropdown( label="难度", choices=["easy", "medium", "hard"], value="medium" ) quiz_btn = gr.Button("生成测验", variant="primary") quiz_output = gr.Markdown() quiz_btn.click( lambda d: asyncio.run(generate_quiz(d)), inputs=[quiz_difficulty], outputs=[quiz_output] ) with gr.Accordion("🃏 闪卡", open=False): flashcard_btn = gr.Button("生成闪卡", variant="primary") flashcard_output = gr.Markdown() flashcard_btn.click( lambda: asyncio.run(generate_flashcards()), outputs=[flashcard_output] ) gr.Markdown( """ --- ⚠️ **注意**:这是使用非官方 API 的社区项目。请遵守 Google 的服务条款。 """ ) return demo if __name__ == "__main__": from api import app as fastapi_app import gradio as gr demo = create_ui() # 将 FastAPI 挂载到 Gradio app = gr.mount_gradio_app(fastapi_app, demo, path="/") import uvicorn uvicorn.run(app, host="0.0.0.0", port=7860)