notebooklm-py / app.py
ASEM12345's picture
Upload app.py with huggingface_hub
b6d2cbc verified
"""
NotebookLM-Py Gradio Web UI
基于 notebooklm-py 的 Web 界面
"""
import asyncio
import os
import gradio as gr
from notebooklm import NotebookLMClient
from contextlib import asynccontextmanager
# 全局状态
_current_notebook_id = None
@asynccontextmanager
async def get_client():
"""获取 NotebookLM 客户端(使用 async with 上下文)"""
async with await NotebookLMClient.from_storage() as client:
yield client
# ==================== 笔记本操作 ====================
async def list_notebooks():
"""列出所有笔记本"""
try:
async with get_client() as client:
notebooks = await client.notebooks.list()
if not notebooks:
return "📭 没有找到笔记本", []
# 格式化列表
result = "📚 **笔记本列表**\n\n"
choices = []
for nb in notebooks:
result += f"- **{nb.title}** (`{nb.id}`)\n"
choices.append((nb.title, nb.id))
return result, choices
except Exception as e:
return f"❌ 错误: {str(e)}", []
async def create_notebook(title: str):
"""创建新笔记本"""
global _current_notebook_id
if not title.strip():
return "⚠️ 请输入笔记本标题", None
try:
async with get_client() as client:
nb = await client.notebooks.create(title.strip())
_current_notebook_id = nb.id
return f"✅ 创建成功: **{nb.title}** (`{nb.id}`)", nb.id
except Exception as e:
return f"❌ 错误: {str(e)}", None
async def select_notebook(notebook_id: str):
"""选择当前笔记本"""
global _current_notebook_id
if not notebook_id:
return "⚠️ 请选择笔记本"
_current_notebook_id = notebook_id
return f"✅ 已选择笔记本: `{notebook_id}`"
# ==================== 资源操作 ====================
async def add_url_source(url: str):
"""添加 URL 资源"""
global _current_notebook_id
if not _current_notebook_id:
return "⚠️ 请先选择笔记本"
if not url.strip():
return "⚠️ 请输入 URL"
try:
async with get_client() as client:
await client.sources.add_url(_current_notebook_id, url.strip(), wait=True)
return f"✅ 已添加 URL: {url}"
except Exception as e:
return f"❌ 错误: {str(e)}"
async def list_sources():
"""列出当前笔记本的资源"""
global _current_notebook_id
if not _current_notebook_id:
return "⚠️ 请先选择笔记本"
try:
async with get_client() as client:
sources = await client.sources.list(_current_notebook_id)
if not sources:
return "📭 没有资源"
result = "📎 **资源列表**\n\n"
for src in sources:
result += f"- {src.title} ({src.source_type})\n"
return result
except Exception as e:
return f"❌ 错误: {str(e)}"
# ==================== 对话操作 ====================
async def chat_with_sources(question: str, history: list):
"""与资源对话"""
global _current_notebook_id
if not _current_notebook_id:
return history + [{"role": "assistant", "content": "⚠️ 请先选择笔记本"}]
if not question.strip():
return history
try:
async with get_client() as client:
result = await client.chat.ask(_current_notebook_id, question.strip())
new_history = history + [
{"role": "user", "content": question},
{"role": "assistant", "content": result.answer}
]
return new_history
except Exception as e:
return history + [
{"role": "user", "content": question},
{"role": "assistant", "content": f"❌ 错误: {str(e)}"}
]
# ==================== 内容生成 ====================
async def generate_audio(instructions: str):
"""生成音频播客"""
global _current_notebook_id
if not _current_notebook_id:
return "⚠️ 请先选择笔记本", None
try:
async with get_client() as client:
status = await client.artifacts.generate_audio(
_current_notebook_id,
instructions=instructions if instructions.strip() else None
)
await client.artifacts.wait_for_completion(_current_notebook_id, status.task_id)
# 下载音频
output_path = "/tmp/podcast.mp3"
await client.artifacts.download_audio(_current_notebook_id, output_path)
return "✅ 音频生成成功!", output_path
except Exception as e:
return f"❌ 错误: {str(e)}", None
async def generate_quiz(difficulty: str):
"""生成测验"""
global _current_notebook_id
if not _current_notebook_id:
return "⚠️ 请先选择笔记本"
try:
async with get_client() as client:
status = await client.artifacts.generate_quiz(
_current_notebook_id,
difficulty=difficulty if difficulty else None
)
await client.artifacts.wait_for_completion(_current_notebook_id, status.task_id)
# 下载测验
output_path = "/tmp/quiz.md"
await client.artifacts.download_quiz(_current_notebook_id, output_path, output_format="markdown")
with open(output_path, "r", encoding="utf-8") as f:
content = f.read()
return f"✅ 测验生成成功!\n\n{content}"
except Exception as e:
return f"❌ 错误: {str(e)}"
async def generate_flashcards():
"""生成闪卡"""
global _current_notebook_id
if not _current_notebook_id:
return "⚠️ 请先选择笔记本"
try:
async with get_client() as client:
status = await client.artifacts.generate_flashcards(_current_notebook_id)
await client.artifacts.wait_for_completion(_current_notebook_id, status.task_id)
# 下载闪卡
output_path = "/tmp/flashcards.md"
await client.artifacts.download_flashcards(_current_notebook_id, output_path, output_format="markdown")
with open(output_path, "r", encoding="utf-8") as f:
content = f.read()
return f"✅ 闪卡生成成功!\n\n{content}"
except Exception as e:
return f"❌ 错误: {str(e)}"
# ==================== Gradio UI ====================
def create_ui():
"""创建 Gradio 界面"""
with gr.Blocks(title="NotebookLM-Py") as demo:
gr.Markdown(
"""
# 📓 NotebookLM-Py Web UI
基于 [notebooklm-py](https://github.com/teng-lin/notebooklm-py) 的 Web 界面,
让你可以通过浏览器使用 Google NotebookLM 的全部功能。
""",
elem_classes=["main-title"]
)
with gr.Tabs():
# ===== Tab 1: 笔记本管理 =====
with gr.TabItem("📚 笔记本"):
with gr.Row():
with gr.Column():
gr.Markdown("### 创建新笔记本")
new_title = gr.Textbox(label="标题", placeholder="输入笔记本标题...")
create_btn = gr.Button("创建", variant="primary")
create_result = gr.Markdown()
with gr.Column():
gr.Markdown("### 选择笔记本")
refresh_btn = gr.Button("🔄 刷新列表")
notebook_list = gr.Markdown()
notebook_dropdown = gr.Dropdown(label="选择", choices=[])
select_btn = gr.Button("选择此笔记本", variant="secondary")
select_result = gr.Markdown()
# 事件绑定
create_btn.click(
lambda t: asyncio.run(create_notebook(t)),
inputs=[new_title],
outputs=[create_result, notebook_dropdown]
)
refresh_btn.click(
lambda: asyncio.run(list_notebooks()),
outputs=[notebook_list, notebook_dropdown]
)
select_btn.click(
lambda n: asyncio.run(select_notebook(n)),
inputs=[notebook_dropdown],
outputs=[select_result]
)
# ===== Tab 2: 资源管理 =====
with gr.TabItem("📎 资源"):
with gr.Row():
with gr.Column():
gr.Markdown("### 添加 URL 资源")
url_input = gr.Textbox(label="URL", placeholder="https://...")
add_url_btn = gr.Button("添加 URL", variant="primary")
add_result = gr.Markdown()
with gr.Column():
gr.Markdown("### 当前资源")
list_sources_btn = gr.Button("🔄 刷新资源列表")
sources_list = gr.Markdown()
add_url_btn.click(
lambda u: asyncio.run(add_url_source(u)),
inputs=[url_input],
outputs=[add_result]
)
list_sources_btn.click(
lambda: asyncio.run(list_sources()),
outputs=[sources_list]
)
# ===== Tab 3: 对话 =====
with gr.TabItem("💬 对话"):
chatbot = gr.Chatbot(label="与资源对话", height=400)
with gr.Row():
chat_input = gr.Textbox(
label="提问",
placeholder="输入你的问题...",
scale=4
)
chat_btn = gr.Button("发送", variant="primary", scale=1)
chat_btn.click(
lambda q, h: asyncio.run(chat_with_sources(q, h)),
inputs=[chat_input, chatbot],
outputs=[chatbot]
)
chat_input.submit(
lambda q, h: asyncio.run(chat_with_sources(q, h)),
inputs=[chat_input, chatbot],
outputs=[chatbot]
)
# ===== Tab 4: 内容生成 =====
with gr.TabItem("🎙️ 生成"):
gr.Markdown("### 生成内容")
with gr.Accordion("🎧 音频播客", open=True):
audio_instructions = gr.Textbox(
label="指令(可选)",
placeholder="例如:让它更有趣..."
)
audio_btn = gr.Button("生成音频", variant="primary")
audio_status = gr.Markdown()
audio_output = gr.Audio(label="生成的音频")
audio_btn.click(
lambda i: asyncio.run(generate_audio(i)),
inputs=[audio_instructions],
outputs=[audio_status, audio_output]
)
with gr.Accordion("📝 测验", open=False):
quiz_difficulty = gr.Dropdown(
label="难度",
choices=["easy", "medium", "hard"],
value="medium"
)
quiz_btn = gr.Button("生成测验", variant="primary")
quiz_output = gr.Markdown()
quiz_btn.click(
lambda d: asyncio.run(generate_quiz(d)),
inputs=[quiz_difficulty],
outputs=[quiz_output]
)
with gr.Accordion("🃏 闪卡", open=False):
flashcard_btn = gr.Button("生成闪卡", variant="primary")
flashcard_output = gr.Markdown()
flashcard_btn.click(
lambda: asyncio.run(generate_flashcards()),
outputs=[flashcard_output]
)
gr.Markdown(
"""
---
⚠️ **注意**:这是使用非官方 API 的社区项目。请遵守 Google 的服务条款。
"""
)
return demo
if __name__ == "__main__":
from api import app as fastapi_app
import gradio as gr
demo = create_ui()
# 将 FastAPI 挂载到 Gradio
app = gr.mount_gradio_app(fastapi_app, demo, path="/")
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=7860)