Abhinav Biju
fast/thinking toggle
cc2dc62
"""Gradio UI for the NotebookLM-style application.
Spec references:
- `specs/02_architecture.md`: Gradio frontend with HF OAuth login and notebook switching.
- `specs/04_interfaces.md`: all backend interactions go through module APIs.
- `specs/07_security.md`: authentication and per-user isolation.
- `specs/08_ui_spec.md`: login status, notebook selector, upload, chat, and artifact panels.
- `specs/10_test_plan.md`: explicit error handling and testable UI helpers.
"""
from __future__ import annotations
from pathlib import Path
import os
import sys
from typing import Any
from uuid import uuid4
import gradio as gr
PROJECT_ROOT = Path(__file__).resolve().parent
SRC_ROOT = PROJECT_ROOT / "src"
if str(SRC_ROOT) not in sys.path:
sys.path.insert(0, str(SRC_ROOT))
DATA_ROOT = Path(os.getenv("NOTEBOOKLM_DATA_ROOT", "/tmp/notebooklm")).expanduser()
from ingestion.chunking import sentence_aware_chunk, semantic_chunk
from ingestion.embedder import embed_texts
from ingestion.extractors import (
extract_text_from_pdf,
extract_text_from_pptx,
extract_text_from_txt,
extract_text_from_url,
)
from ingestion.indexer import upsert_chunks
from notebooklm_clone.artifacts import (
ArtifactRef,
generate_podcast_transcript,
generate_quiz,
generate_report,
)
from notebooklm_clone.auth import NotAuthenticatedError, get_current_user
from notebooklm_clone.chat import ChatResponse, answer_question
from notebooklm_clone.export import export_notebook_zip
from notebooklm_clone.notebooks import (
NotebookRecord,
create_notebook,
list_notebooks,
)
CHUNK_MAX_CHARS = 1200
CHUNK_OVERLAP_CHARS = 200
def _artifact_choices(paths: list[str]) -> list[tuple[str, str]]:
"""Map artifact paths into Gradio dropdown choices."""
return [(Path(path).name, path) for path in paths]
def _require_user(request: gr.Request | None) -> str:
"""Extract the authenticated username from the request context."""
if request is None:
raise NotAuthenticatedError("Authenticated request context is required.")
return get_current_user(request)
def _resolve_username(
profile: gr.OAuthProfile | None,
request: gr.Request | None,
current_username: str | None = None,
) -> str:
"""Resolve the authenticated username from OAuth profile, request, or stored state."""
if profile is not None:
username: str | None = getattr(profile, "username", None)
if isinstance(username, str) and username.strip():
return username.strip()
if current_username is not None and current_username.strip():
return current_username.strip()
return _require_user(request)
def _notebook_choices(notebooks: list[NotebookRecord]) -> list[tuple[str, str]]:
"""Map notebook records into dropdown choices."""
return [(notebook["name"], notebook["id"]) for notebook in notebooks]
def _render_login_status(username: str) -> str:
"""Render the top-bar login status."""
return f"**Signed in as:** `{username}`"
def _render_logged_out_status() -> str:
"""Render the top-bar status for unauthenticated visitors."""
return "**Not signed in.** Use the Hugging Face login button to continue."
def _render_citations(citations: list[dict[str, Any]]) -> str:
"""Render structured citations into markdown for the chat panel."""
if not citations:
return ""
lines: list[str] = ["", "", "Sources:"]
for citation in citations:
marker: str = str(citation.get("marker", ""))
source_name: str = str(citation.get("source_name", ""))
source_id: str = str(citation.get("source_id", ""))
loc: Any = citation.get("loc")
lines.append(f"- {marker} {source_name} (`{source_id}`) {loc}")
return "\n".join(lines)
def _refresh_notebook_state(
username: str,
selected_notebook_id: str | None = None,
) -> tuple[str, gr.Dropdown]:
"""Build notebook dropdown UI state for the authenticated user."""
notebooks: list[NotebookRecord] = list_notebooks(username)
choices: list[tuple[str, str]] = _notebook_choices(notebooks)
value: str | None = selected_notebook_id
if value is None and notebooks:
value = notebooks[0]["id"]
if value is not None and value not in {notebook["id"] for notebook in notebooks}:
value = notebooks[0]["id"] if notebooks else None
return _render_login_status(username), gr.Dropdown(choices=choices, value=value)
def load_session(
profile: gr.OAuthProfile | None,
request: gr.Request,
) -> tuple[str, gr.Dropdown, list[dict[str, str]], gr.Dropdown, str, str]:
"""Initialize login status and notebook selector when the UI loads."""
try:
username: str = _resolve_username(profile, request)
login_status, notebook_dropdown = _refresh_notebook_state(username)
except NotAuthenticatedError:
login_status = _render_logged_out_status()
notebook_dropdown = gr.Dropdown(choices=[], value=None)
username = ""
empty_chat: list[dict[str, str]] = []
artifact_dropdown = gr.Dropdown(choices=[], value=None)
return login_status, notebook_dropdown, empty_chat, artifact_dropdown, username, "Session loaded."
def create_notebook_ui(
notebook_name: str,
current_username: str,
profile: gr.OAuthProfile | None,
request: gr.Request,
) -> tuple[str, gr.Dropdown, str, str, str]:
"""Create a notebook and refresh the selector."""
username: str = _resolve_username(profile, request, current_username)
notebook: NotebookRecord = create_notebook(username, notebook_name)
login_status, dropdown = _refresh_notebook_state(username, notebook["id"])
return login_status, dropdown, "", username, f"Notebook created: `{notebook['name']}`"
def on_notebook_change(_notebook_id: str | None, username: str = "") -> tuple[list[dict[str, str]], gr.Dropdown, str, str, list[str], str]:
"""Clear notebook-scoped UI state when the selected notebook changes."""
existing_docs: list[str] = []
if _notebook_id and username:
try:
from ingestion.indexer import _get_collection
collection = _get_collection(username, _notebook_id)
# Get all metadata to extract unique source_names
all_meta = collection.get(include=["metadatas"])
seen: set[str] = set()
for meta in (all_meta.get("metadatas") or []):
name = (meta or {}).get("source_name", "")
if name and name not in seen:
seen.add(name)
existing_docs.append(name)
except Exception:
pass
docs_md = _render_uploaded_docs(existing_docs)
return [], gr.Dropdown(choices=[], value=None), "", "Notebook selection updated.", existing_docs, docs_md
def _extract_from_file(file_path: str) -> tuple[str, str]:
"""Dispatch local file extraction by suffix."""
path = Path(file_path)
suffix: str = path.suffix.lower()
if suffix == ".pdf":
doc = extract_text_from_pdf(path)
elif suffix == ".pptx":
doc = extract_text_from_pptx(path)
elif suffix == ".txt":
doc = extract_text_from_txt(path)
else:
raise ValueError(f"Unsupported upload type: {suffix}")
return doc["text"], path.name
def _generate_context_header(source_name: str, text: str) -> str:
"""Generate a one-sentence contextual summary for chunk headers.
Uses the LLM to summarize the document, giving the embedding model
semantic context for disambiguation. Falls back to source_name if
no API key is set or the call fails.
"""
api_key: str = os.getenv("OPENAI_API_KEY", "").strip()
model: str = os.getenv("NOTEBOOKLM_CHAT_MODEL", "gpt-4o-mini").strip()
if not api_key:
return source_name
try:
from openai import OpenAI
client = OpenAI(api_key=api_key)
# Use only the first 2000 chars to keep the call fast/cheap
preview = text[:2000]
response = client.chat.completions.create(
model=model,
messages=[
{
"role": "system",
"content": (
"Write ONE concise sentence summarizing what this document is about. "
"Focus on the specific subject matter and key topics. "
"Do not start with 'This document'. Just state the subject."
),
},
{"role": "user", "content": preview},
],
temperature=0.0,
max_tokens=60,
)
summary = (response.choices[0].message.content or "").strip().rstrip(".")
if summary:
return f"{source_name} | {summary}"
except Exception:
pass
return source_name
def _render_uploaded_docs(doc_list: list[str]) -> str:
"""Render the uploaded documents list as markdown."""
if not doc_list:
return "*No sources uploaded yet.*"
lines = ["**Uploaded Sources:**"]
for i, name in enumerate(doc_list, 1):
lines.append(f"{i}. 📄 {name}")
return "\n".join(lines)
def _step(msg: str) -> str:
"""Format an in-progress step message."""
return f"⏳ **{msg}**"
def ingest_upload_ui(
notebook_id: str | None,
file_path: str | None,
uploaded_docs: list[str] | None,
current_username: str,
profile: gr.OAuthProfile | None,
request: gr.Request,
):
"""Ingest an uploaded local file (generator — yields live status)."""
username: str = _resolve_username(profile, request, current_username)
if not notebook_id:
raise gr.Error("Select a notebook before uploading a source.")
if not file_path:
raise gr.Error("Choose a file to upload.")
docs = list(uploaded_docs or [])
docs_md = _render_uploaded_docs(docs)
# Step 1: Extract text
yield _step("Extracting text from file…"), docs, docs_md
source_text, source_name = _extract_from_file(file_path)
# Step 2: Generate contextual header
yield _step(f"Generating context header for *{source_name}*…"), docs, docs_md
header = _generate_context_header(source_name, source_text)
# Step 3: Chunking
yield _step(f"Chunking *{source_name}*…"), docs, docs_md
chunking_method = os.getenv("NOTEBOOKLM_CHUNKING_METHOD", "semantic").strip().lower()
if chunking_method == "semantic":
chunks = semantic_chunk(text=source_text, max_chars=CHUNK_MAX_CHARS, header=header)
else:
chunks = sentence_aware_chunk(text=source_text, max_chars=CHUNK_MAX_CHARS, overlap_chars=CHUNK_OVERLAP_CHARS, header=header)
if not chunks:
raise gr.Error("No indexable text was extracted from the source.")
# Step 4: Embedding
yield _step(f"Embedding {len(chunks)} chunks…"), docs, docs_md
embeddings = embed_texts([c["chunk_text"] for c in chunks])
location_hints = [{"start_char": c["start_char"], "end_char": c["end_char"]} for c in chunks]
# Step 5: Indexing
yield _step(f"Indexing {len(chunks)} chunks…"), docs, docs_md
summary = upsert_chunks(
username=username, notebook_id=notebook_id, source_id=str(uuid4()),
chunks=chunks, embeddings=embeddings,
meta={"source_name": source_name, "location_hints": location_hints},
)
# Done — update docs list
if source_name not in docs:
docs.append(source_name)
yield f"✅ Indexed **{summary['chunk_count']}** chunks from `{source_name}`.", docs, _render_uploaded_docs(docs)
def ingest_url_ui(
notebook_id: str | None,
url: str,
uploaded_docs: list[str] | None,
current_username: str,
profile: gr.OAuthProfile | None,
request: gr.Request,
):
"""Ingest a URL source (generator — yields live status)."""
username: str = _resolve_username(profile, request, current_username)
if not notebook_id:
raise gr.Error("Select a notebook before ingesting a URL.")
if not url or not url.strip():
raise gr.Error("Enter a URL to ingest.")
source_label = url.strip()
docs = list(uploaded_docs or [])
docs_md = _render_uploaded_docs(docs)
# Step 1: Fetch URL
yield _step("Fetching URL content…"), docs, docs_md
doc = extract_text_from_url(source_label)
# Step 2: Generate contextual header
yield _step(f"Generating context header…"), docs, docs_md
header = _generate_context_header(source_label, doc["text"])
# Step 3: Chunking
yield _step("Chunking content…"), docs, docs_md
chunking_method = os.getenv("NOTEBOOKLM_CHUNKING_METHOD", "semantic").strip().lower()
if chunking_method == "semantic":
chunks = semantic_chunk(text=doc["text"], max_chars=CHUNK_MAX_CHARS, header=header)
else:
chunks = sentence_aware_chunk(text=doc["text"], max_chars=CHUNK_MAX_CHARS, overlap_chars=CHUNK_OVERLAP_CHARS, header=header)
if not chunks:
raise gr.Error("No indexable text was extracted from the URL.")
# Step 4: Embedding
yield _step(f"Embedding {len(chunks)} chunks…"), docs, docs_md
embeddings = embed_texts([c["chunk_text"] for c in chunks])
location_hints = [{"start_char": c["start_char"], "end_char": c["end_char"]} for c in chunks]
# Step 5: Indexing
yield _step(f"Indexing {len(chunks)} chunks…"), docs, docs_md
summary = upsert_chunks(
username=username, notebook_id=notebook_id, source_id=str(uuid4()),
chunks=chunks, embeddings=embeddings,
meta={"source_name": source_label, "location_hints": location_hints},
)
# Done
if source_label not in docs:
docs.append(source_label)
yield f"✅ Indexed **{summary['chunk_count']}** chunks from `{source_label}`.", docs, _render_uploaded_docs(docs)
def send_chat_ui(
notebook_id: str | None,
question: str,
rag_mode: str,
history: list[dict[str, str]] | None,
current_username: str,
profile: gr.OAuthProfile | None,
request: gr.Request,
) -> tuple[str, list[dict[str, str]]]:
"""Send one chat question and append the grounded answer to the chat history."""
username: str = _resolve_username(profile, request, current_username)
if not notebook_id:
raise gr.Error("Select a notebook before sending a message.")
if not question or not question.strip():
raise gr.Error("Message cannot be empty.")
chat_history: list[dict[str, str]] = history or []
try:
response: ChatResponse = answer_question(username, notebook_id, question.strip(), rag_mode)
except Exception as e:
chat_history.append({"role": "user", "content": question.strip()})
chat_history.append({"role": "assistant", "content": f"Error: {e}"})
return "", chat_history
updated_history: list[dict[str, str]] = list(history or [])
updated_history.append({"role": "user", "content": question.strip()})
updated_history.append(
{
"role": "assistant",
"content": response["content"] + _render_citations(response["citations"]),
}
)
return "", updated_history
def _append_artifact_path(current_paths: list[str] | None, artifact: ArtifactRef) -> tuple[list[str], gr.Dropdown]:
"""Append one generated artifact path and refresh the download list."""
paths: list[str] = list(current_paths or [])
if artifact["path"] not in paths:
paths.append(artifact["path"])
return paths, gr.Dropdown(choices=_artifact_choices(paths), value=artifact["path"])
def generate_report_ui(
notebook_id: str | None,
artifact_paths: list[str] | None,
current_username: str,
profile: gr.OAuthProfile | None,
request: gr.Request,
) -> tuple[list[str], gr.Dropdown, str]:
"""Generate a report artifact and update the download list."""
username: str = _resolve_username(profile, request, current_username)
if not notebook_id:
raise gr.Error("Select a notebook before generating a report.")
artifact = generate_report(username, notebook_id)
paths, dropdown = _append_artifact_path(artifact_paths, artifact)
return paths, dropdown, f"Report generated: `{Path(artifact['path']).name}`"
def generate_quiz_ui(
notebook_id: str | None,
artifact_paths: list[str] | None,
current_username: str,
profile: gr.OAuthProfile | None,
request: gr.Request,
) -> tuple[list[str], gr.Dropdown, str]:
"""Generate a quiz artifact and update the download list."""
username: str = _resolve_username(profile, request, current_username)
if not notebook_id:
raise gr.Error("Select a notebook before generating a quiz.")
artifact = generate_quiz(username, notebook_id)
paths, dropdown = _append_artifact_path(artifact_paths, artifact)
return paths, dropdown, f"Quiz generated: `{Path(artifact['path']).name}`"
def generate_podcast_ui(
notebook_id: str | None,
artifact_paths: list[str] | None,
current_username: str,
profile: gr.OAuthProfile | None,
request: gr.Request,
) -> tuple[list[str], gr.Dropdown, str]:
"""Generate a podcast transcript artifact and update the download list."""
username: str = _resolve_username(profile, request, current_username)
if not notebook_id:
raise gr.Error("Select a notebook before generating a transcript.")
artifact = generate_podcast_transcript(username, notebook_id)
paths, dropdown = _append_artifact_path(artifact_paths, artifact)
return paths, dropdown, f"Transcript generated: `{Path(artifact['path']).name}`"
def select_artifact_download(artifact_path: str | None) -> Path | None:
"""Map the selected artifact path into a downloadable file."""
if not artifact_path:
return None
return Path(artifact_path)
def export_notebook_ui(
notebook_id: str | None,
current_username: str,
profile: gr.OAuthProfile | None,
request: gr.Request,
) -> tuple[Path, str]:
"""Export the selected notebook as a zip archive."""
username: str = _resolve_username(profile, request, current_username)
if not notebook_id:
raise gr.Error("Select a notebook before exporting.")
export_path: Path = export_notebook_zip(username, notebook_id)
return export_path, f"Notebook exported: `{export_path.name}`"
with gr.Blocks(title="NotebookLM Clone") as demo:
artifact_paths_state = gr.State(value=[])
username_state = gr.State(value="")
uploaded_docs_state = gr.State(value=[])
gr.Markdown("# NotebookLM Clone")
with gr.Row():
login_button = gr.LoginButton()
login_status = gr.Markdown("Not signed in.")
notebook_dropdown = gr.Dropdown(
label="Notebook",
choices=[],
value=None,
interactive=True,
)
with gr.Row():
new_notebook_name = gr.Textbox(label="New Notebook", placeholder="Create a notebook")
create_notebook_button = gr.Button("Create Notebook", variant="primary")
activity_status = gr.Markdown("Ready.")
with gr.Row():
with gr.Column():
gr.Markdown("## Upload")
file_input = gr.File(
label="Upload source",
file_types=[".pdf", ".pptx", ".txt"],
type="filepath",
)
upload_button = gr.Button("Ingest Upload")
url_input = gr.Textbox(label="URL", placeholder="https://example.com/article")
url_button = gr.Button("Ingest URL")
ingest_status = gr.Markdown()
gr.Markdown("---")
uploaded_docs_display = gr.Markdown("*No sources uploaded yet.*")
with gr.Column():
gr.Markdown("## Chat")
chat_history = gr.Chatbot(
elem_id="chat-history",
show_label=False,
)
with gr.Row():
chat_input = gr.Textbox(
show_label=False,
placeholder="Ask a question about your sources...",
scale=4,
)
rag_mode = gr.Radio(
choices=["Fast", "Reasoning"],
value="Reasoning",
label="RAG Mode",
scale=1,
interactive=True,
)
chat_submit = gr.Button("Send", variant="primary")
with gr.Column():
gr.Markdown("## Artifacts")
report_button = gr.Button("Generate Report")
quiz_button = gr.Button("Generate Quiz")
podcast_button = gr.Button("Generate Transcript")
artifact_dropdown = gr.Dropdown(
label="Generated Artifacts",
choices=[],
value=None,
)
artifact_download = gr.DownloadButton(label="Download Artifact")
export_button = gr.Button("Export Notebook Zip")
export_download = gr.DownloadButton(label="Download Notebook Zip")
demo.load(
load_session,
inputs=None,
outputs=[login_status, notebook_dropdown, chat_history, artifact_dropdown, username_state, activity_status],
)
create_notebook_button.click(
create_notebook_ui,
inputs=[new_notebook_name, username_state],
outputs=[login_status, notebook_dropdown, new_notebook_name, username_state, activity_status],
)
notebook_dropdown.change(
on_notebook_change,
inputs=[notebook_dropdown, username_state],
outputs=[chat_history, artifact_dropdown, ingest_status, activity_status, uploaded_docs_state, uploaded_docs_display],
).then(
lambda: [],
inputs=None,
outputs=[artifact_paths_state],
)
upload_button.click(
ingest_upload_ui,
inputs=[notebook_dropdown, file_input, uploaded_docs_state, username_state],
outputs=[ingest_status, uploaded_docs_state, uploaded_docs_display],
)
url_button.click(
ingest_url_ui,
inputs=[notebook_dropdown, url_input, uploaded_docs_state, username_state],
outputs=[ingest_status, uploaded_docs_state, uploaded_docs_display],
)
chat_submit.click(
send_chat_ui,
inputs=[notebook_dropdown, chat_input, rag_mode, chat_history, username_state],
outputs=[chat_input, chat_history],
)
chat_input.submit(
send_chat_ui,
inputs=[notebook_dropdown, chat_input, rag_mode, chat_history, username_state],
outputs=[chat_input, chat_history],
)
report_button.click(
generate_report_ui,
inputs=[notebook_dropdown, artifact_paths_state, username_state],
outputs=[artifact_paths_state, artifact_dropdown, activity_status],
)
quiz_button.click(
generate_quiz_ui,
inputs=[notebook_dropdown, artifact_paths_state, username_state],
outputs=[artifact_paths_state, artifact_dropdown, activity_status],
)
podcast_button.click(
generate_podcast_ui,
inputs=[notebook_dropdown, artifact_paths_state, username_state],
outputs=[artifact_paths_state, artifact_dropdown, activity_status],
)
artifact_dropdown.change(
select_artifact_download,
inputs=[artifact_dropdown],
outputs=[artifact_download],
)
export_button.click(
export_notebook_ui,
inputs=[notebook_dropdown, username_state],
outputs=[export_download, activity_status],
)
if __name__ == "__main__":
demo.launch(allowed_paths=[str(DATA_ROOT)])