"""Gradio UI for the NotebookLM-style application. Spec references: - `specs/02_architecture.md`: Gradio frontend with HF OAuth login and notebook switching. - `specs/04_interfaces.md`: all backend interactions go through module APIs. - `specs/07_security.md`: authentication and per-user isolation. - `specs/08_ui_spec.md`: login status, notebook selector, upload, chat, and artifact panels. - `specs/10_test_plan.md`: explicit error handling and testable UI helpers. """ from __future__ import annotations from pathlib import Path import os import sys from typing import Any from uuid import uuid4 import gradio as gr PROJECT_ROOT = Path(__file__).resolve().parent SRC_ROOT = PROJECT_ROOT / "src" if str(SRC_ROOT) not in sys.path: sys.path.insert(0, str(SRC_ROOT)) DATA_ROOT = Path(os.getenv("NOTEBOOKLM_DATA_ROOT", "/tmp/notebooklm")).expanduser() from ingestion.chunking import sentence_aware_chunk, semantic_chunk from ingestion.embedder import embed_texts from ingestion.extractors import ( extract_text_from_pdf, extract_text_from_pptx, extract_text_from_txt, extract_text_from_url, ) from ingestion.indexer import upsert_chunks from notebooklm_clone.artifacts import ( ArtifactRef, generate_podcast_transcript, generate_quiz, generate_report, ) from notebooklm_clone.auth import NotAuthenticatedError, get_current_user from notebooklm_clone.chat import ChatResponse, answer_question from notebooklm_clone.export import export_notebook_zip from notebooklm_clone.notebooks import ( NotebookRecord, create_notebook, list_notebooks, ) CHUNK_MAX_CHARS = 1200 CHUNK_OVERLAP_CHARS = 200 def _artifact_choices(paths: list[str]) -> list[tuple[str, str]]: """Map artifact paths into Gradio dropdown choices.""" return [(Path(path).name, path) for path in paths] def _require_user(request: gr.Request | None) -> str: """Extract the authenticated username from the request context.""" if request is None: raise NotAuthenticatedError("Authenticated request context is required.") return get_current_user(request) def _resolve_username( profile: gr.OAuthProfile | None, request: gr.Request | None, current_username: str | None = None, ) -> str: """Resolve the authenticated username from OAuth profile, request, or stored state.""" if profile is not None: username: str | None = getattr(profile, "username", None) if isinstance(username, str) and username.strip(): return username.strip() if current_username is not None and current_username.strip(): return current_username.strip() return _require_user(request) def _notebook_choices(notebooks: list[NotebookRecord]) -> list[tuple[str, str]]: """Map notebook records into dropdown choices.""" return [(notebook["name"], notebook["id"]) for notebook in notebooks] def _render_login_status(username: str) -> str: """Render the top-bar login status.""" return f"**Signed in as:** `{username}`" def _render_logged_out_status() -> str: """Render the top-bar status for unauthenticated visitors.""" return "**Not signed in.** Use the Hugging Face login button to continue." def _render_citations(citations: list[dict[str, Any]]) -> str: """Render structured citations into markdown for the chat panel.""" if not citations: return "" lines: list[str] = ["", "", "Sources:"] for citation in citations: marker: str = str(citation.get("marker", "")) source_name: str = str(citation.get("source_name", "")) source_id: str = str(citation.get("source_id", "")) loc: Any = citation.get("loc") lines.append(f"- {marker} {source_name} (`{source_id}`) {loc}") return "\n".join(lines) def _refresh_notebook_state( username: str, selected_notebook_id: str | None = None, ) -> tuple[str, gr.Dropdown]: """Build notebook dropdown UI state for the authenticated user.""" notebooks: list[NotebookRecord] = list_notebooks(username) choices: list[tuple[str, str]] = _notebook_choices(notebooks) value: str | None = selected_notebook_id if value is None and notebooks: value = notebooks[0]["id"] if value is not None and value not in {notebook["id"] for notebook in notebooks}: value = notebooks[0]["id"] if notebooks else None return _render_login_status(username), gr.Dropdown(choices=choices, value=value) def load_session( profile: gr.OAuthProfile | None, request: gr.Request, ) -> tuple[str, gr.Dropdown, list[dict[str, str]], gr.Dropdown, str, str]: """Initialize login status and notebook selector when the UI loads.""" try: username: str = _resolve_username(profile, request) login_status, notebook_dropdown = _refresh_notebook_state(username) except NotAuthenticatedError: login_status = _render_logged_out_status() notebook_dropdown = gr.Dropdown(choices=[], value=None) username = "" empty_chat: list[dict[str, str]] = [] artifact_dropdown = gr.Dropdown(choices=[], value=None) return login_status, notebook_dropdown, empty_chat, artifact_dropdown, username, "Session loaded." def create_notebook_ui( notebook_name: str, current_username: str, profile: gr.OAuthProfile | None, request: gr.Request, ) -> tuple[str, gr.Dropdown, str, str, str]: """Create a notebook and refresh the selector.""" username: str = _resolve_username(profile, request, current_username) notebook: NotebookRecord = create_notebook(username, notebook_name) login_status, dropdown = _refresh_notebook_state(username, notebook["id"]) return login_status, dropdown, "", username, f"Notebook created: `{notebook['name']}`" def on_notebook_change(_notebook_id: str | None, username: str = "") -> tuple[list[dict[str, str]], gr.Dropdown, str, str, list[str], str]: """Clear notebook-scoped UI state when the selected notebook changes.""" existing_docs: list[str] = [] if _notebook_id and username: try: from ingestion.indexer import _get_collection collection = _get_collection(username, _notebook_id) # Get all metadata to extract unique source_names all_meta = collection.get(include=["metadatas"]) seen: set[str] = set() for meta in (all_meta.get("metadatas") or []): name = (meta or {}).get("source_name", "") if name and name not in seen: seen.add(name) existing_docs.append(name) except Exception: pass docs_md = _render_uploaded_docs(existing_docs) return [], gr.Dropdown(choices=[], value=None), "", "Notebook selection updated.", existing_docs, docs_md def _extract_from_file(file_path: str) -> tuple[str, str]: """Dispatch local file extraction by suffix.""" path = Path(file_path) suffix: str = path.suffix.lower() if suffix == ".pdf": doc = extract_text_from_pdf(path) elif suffix == ".pptx": doc = extract_text_from_pptx(path) elif suffix == ".txt": doc = extract_text_from_txt(path) else: raise ValueError(f"Unsupported upload type: {suffix}") return doc["text"], path.name def _generate_context_header(source_name: str, text: str) -> str: """Generate a one-sentence contextual summary for chunk headers. Uses the LLM to summarize the document, giving the embedding model semantic context for disambiguation. Falls back to source_name if no API key is set or the call fails. """ api_key: str = os.getenv("OPENAI_API_KEY", "").strip() model: str = os.getenv("NOTEBOOKLM_CHAT_MODEL", "gpt-4o-mini").strip() if not api_key: return source_name try: from openai import OpenAI client = OpenAI(api_key=api_key) # Use only the first 2000 chars to keep the call fast/cheap preview = text[:2000] response = client.chat.completions.create( model=model, messages=[ { "role": "system", "content": ( "Write ONE concise sentence summarizing what this document is about. " "Focus on the specific subject matter and key topics. " "Do not start with 'This document'. Just state the subject." ), }, {"role": "user", "content": preview}, ], temperature=0.0, max_tokens=60, ) summary = (response.choices[0].message.content or "").strip().rstrip(".") if summary: return f"{source_name} | {summary}" except Exception: pass return source_name def _render_uploaded_docs(doc_list: list[str]) -> str: """Render the uploaded documents list as markdown.""" if not doc_list: return "*No sources uploaded yet.*" lines = ["**Uploaded Sources:**"] for i, name in enumerate(doc_list, 1): lines.append(f"{i}. 📄 {name}") return "\n".join(lines) def _step(msg: str) -> str: """Format an in-progress step message.""" return f"⏳ **{msg}**" def ingest_upload_ui( notebook_id: str | None, file_path: str | None, uploaded_docs: list[str] | None, current_username: str, profile: gr.OAuthProfile | None, request: gr.Request, ): """Ingest an uploaded local file (generator — yields live status).""" username: str = _resolve_username(profile, request, current_username) if not notebook_id: raise gr.Error("Select a notebook before uploading a source.") if not file_path: raise gr.Error("Choose a file to upload.") docs = list(uploaded_docs or []) docs_md = _render_uploaded_docs(docs) # Step 1: Extract text yield _step("Extracting text from file…"), docs, docs_md source_text, source_name = _extract_from_file(file_path) # Step 2: Generate contextual header yield _step(f"Generating context header for *{source_name}*…"), docs, docs_md header = _generate_context_header(source_name, source_text) # Step 3: Chunking yield _step(f"Chunking *{source_name}*…"), docs, docs_md chunking_method = os.getenv("NOTEBOOKLM_CHUNKING_METHOD", "semantic").strip().lower() if chunking_method == "semantic": chunks = semantic_chunk(text=source_text, max_chars=CHUNK_MAX_CHARS, header=header) else: chunks = sentence_aware_chunk(text=source_text, max_chars=CHUNK_MAX_CHARS, overlap_chars=CHUNK_OVERLAP_CHARS, header=header) if not chunks: raise gr.Error("No indexable text was extracted from the source.") # Step 4: Embedding yield _step(f"Embedding {len(chunks)} chunks…"), docs, docs_md embeddings = embed_texts([c["chunk_text"] for c in chunks]) location_hints = [{"start_char": c["start_char"], "end_char": c["end_char"]} for c in chunks] # Step 5: Indexing yield _step(f"Indexing {len(chunks)} chunks…"), docs, docs_md summary = upsert_chunks( username=username, notebook_id=notebook_id, source_id=str(uuid4()), chunks=chunks, embeddings=embeddings, meta={"source_name": source_name, "location_hints": location_hints}, ) # Done — update docs list if source_name not in docs: docs.append(source_name) yield f"✅ Indexed **{summary['chunk_count']}** chunks from `{source_name}`.", docs, _render_uploaded_docs(docs) def ingest_url_ui( notebook_id: str | None, url: str, uploaded_docs: list[str] | None, current_username: str, profile: gr.OAuthProfile | None, request: gr.Request, ): """Ingest a URL source (generator — yields live status).""" username: str = _resolve_username(profile, request, current_username) if not notebook_id: raise gr.Error("Select a notebook before ingesting a URL.") if not url or not url.strip(): raise gr.Error("Enter a URL to ingest.") source_label = url.strip() docs = list(uploaded_docs or []) docs_md = _render_uploaded_docs(docs) # Step 1: Fetch URL yield _step("Fetching URL content…"), docs, docs_md doc = extract_text_from_url(source_label) # Step 2: Generate contextual header yield _step(f"Generating context header…"), docs, docs_md header = _generate_context_header(source_label, doc["text"]) # Step 3: Chunking yield _step("Chunking content…"), docs, docs_md chunking_method = os.getenv("NOTEBOOKLM_CHUNKING_METHOD", "semantic").strip().lower() if chunking_method == "semantic": chunks = semantic_chunk(text=doc["text"], max_chars=CHUNK_MAX_CHARS, header=header) else: chunks = sentence_aware_chunk(text=doc["text"], max_chars=CHUNK_MAX_CHARS, overlap_chars=CHUNK_OVERLAP_CHARS, header=header) if not chunks: raise gr.Error("No indexable text was extracted from the URL.") # Step 4: Embedding yield _step(f"Embedding {len(chunks)} chunks…"), docs, docs_md embeddings = embed_texts([c["chunk_text"] for c in chunks]) location_hints = [{"start_char": c["start_char"], "end_char": c["end_char"]} for c in chunks] # Step 5: Indexing yield _step(f"Indexing {len(chunks)} chunks…"), docs, docs_md summary = upsert_chunks( username=username, notebook_id=notebook_id, source_id=str(uuid4()), chunks=chunks, embeddings=embeddings, meta={"source_name": source_label, "location_hints": location_hints}, ) # Done if source_label not in docs: docs.append(source_label) yield f"✅ Indexed **{summary['chunk_count']}** chunks from `{source_label}`.", docs, _render_uploaded_docs(docs) def send_chat_ui( notebook_id: str | None, question: str, rag_mode: str, history: list[dict[str, str]] | None, current_username: str, profile: gr.OAuthProfile | None, request: gr.Request, ) -> tuple[str, list[dict[str, str]]]: """Send one chat question and append the grounded answer to the chat history.""" username: str = _resolve_username(profile, request, current_username) if not notebook_id: raise gr.Error("Select a notebook before sending a message.") if not question or not question.strip(): raise gr.Error("Message cannot be empty.") chat_history: list[dict[str, str]] = history or [] try: response: ChatResponse = answer_question(username, notebook_id, question.strip(), rag_mode) except Exception as e: chat_history.append({"role": "user", "content": question.strip()}) chat_history.append({"role": "assistant", "content": f"Error: {e}"}) return "", chat_history updated_history: list[dict[str, str]] = list(history or []) updated_history.append({"role": "user", "content": question.strip()}) updated_history.append( { "role": "assistant", "content": response["content"] + _render_citations(response["citations"]), } ) return "", updated_history def _append_artifact_path(current_paths: list[str] | None, artifact: ArtifactRef) -> tuple[list[str], gr.Dropdown]: """Append one generated artifact path and refresh the download list.""" paths: list[str] = list(current_paths or []) if artifact["path"] not in paths: paths.append(artifact["path"]) return paths, gr.Dropdown(choices=_artifact_choices(paths), value=artifact["path"]) def generate_report_ui( notebook_id: str | None, artifact_paths: list[str] | None, current_username: str, profile: gr.OAuthProfile | None, request: gr.Request, ) -> tuple[list[str], gr.Dropdown, str]: """Generate a report artifact and update the download list.""" username: str = _resolve_username(profile, request, current_username) if not notebook_id: raise gr.Error("Select a notebook before generating a report.") artifact = generate_report(username, notebook_id) paths, dropdown = _append_artifact_path(artifact_paths, artifact) return paths, dropdown, f"Report generated: `{Path(artifact['path']).name}`" def generate_quiz_ui( notebook_id: str | None, artifact_paths: list[str] | None, current_username: str, profile: gr.OAuthProfile | None, request: gr.Request, ) -> tuple[list[str], gr.Dropdown, str]: """Generate a quiz artifact and update the download list.""" username: str = _resolve_username(profile, request, current_username) if not notebook_id: raise gr.Error("Select a notebook before generating a quiz.") artifact = generate_quiz(username, notebook_id) paths, dropdown = _append_artifact_path(artifact_paths, artifact) return paths, dropdown, f"Quiz generated: `{Path(artifact['path']).name}`" def generate_podcast_ui( notebook_id: str | None, artifact_paths: list[str] | None, current_username: str, profile: gr.OAuthProfile | None, request: gr.Request, ) -> tuple[list[str], gr.Dropdown, str]: """Generate a podcast transcript artifact and update the download list.""" username: str = _resolve_username(profile, request, current_username) if not notebook_id: raise gr.Error("Select a notebook before generating a transcript.") artifact = generate_podcast_transcript(username, notebook_id) paths, dropdown = _append_artifact_path(artifact_paths, artifact) return paths, dropdown, f"Transcript generated: `{Path(artifact['path']).name}`" def select_artifact_download(artifact_path: str | None) -> Path | None: """Map the selected artifact path into a downloadable file.""" if not artifact_path: return None return Path(artifact_path) def export_notebook_ui( notebook_id: str | None, current_username: str, profile: gr.OAuthProfile | None, request: gr.Request, ) -> tuple[Path, str]: """Export the selected notebook as a zip archive.""" username: str = _resolve_username(profile, request, current_username) if not notebook_id: raise gr.Error("Select a notebook before exporting.") export_path: Path = export_notebook_zip(username, notebook_id) return export_path, f"Notebook exported: `{export_path.name}`" with gr.Blocks(title="NotebookLM Clone") as demo: artifact_paths_state = gr.State(value=[]) username_state = gr.State(value="") uploaded_docs_state = gr.State(value=[]) gr.Markdown("# NotebookLM Clone") with gr.Row(): login_button = gr.LoginButton() login_status = gr.Markdown("Not signed in.") notebook_dropdown = gr.Dropdown( label="Notebook", choices=[], value=None, interactive=True, ) with gr.Row(): new_notebook_name = gr.Textbox(label="New Notebook", placeholder="Create a notebook") create_notebook_button = gr.Button("Create Notebook", variant="primary") activity_status = gr.Markdown("Ready.") with gr.Row(): with gr.Column(): gr.Markdown("## Upload") file_input = gr.File( label="Upload source", file_types=[".pdf", ".pptx", ".txt"], type="filepath", ) upload_button = gr.Button("Ingest Upload") url_input = gr.Textbox(label="URL", placeholder="https://example.com/article") url_button = gr.Button("Ingest URL") ingest_status = gr.Markdown() gr.Markdown("---") uploaded_docs_display = gr.Markdown("*No sources uploaded yet.*") with gr.Column(): gr.Markdown("## Chat") chat_history = gr.Chatbot( elem_id="chat-history", show_label=False, ) with gr.Row(): chat_input = gr.Textbox( show_label=False, placeholder="Ask a question about your sources...", scale=4, ) rag_mode = gr.Radio( choices=["Fast", "Reasoning"], value="Reasoning", label="RAG Mode", scale=1, interactive=True, ) chat_submit = gr.Button("Send", variant="primary") with gr.Column(): gr.Markdown("## Artifacts") report_button = gr.Button("Generate Report") quiz_button = gr.Button("Generate Quiz") podcast_button = gr.Button("Generate Transcript") artifact_dropdown = gr.Dropdown( label="Generated Artifacts", choices=[], value=None, ) artifact_download = gr.DownloadButton(label="Download Artifact") export_button = gr.Button("Export Notebook Zip") export_download = gr.DownloadButton(label="Download Notebook Zip") demo.load( load_session, inputs=None, outputs=[login_status, notebook_dropdown, chat_history, artifact_dropdown, username_state, activity_status], ) create_notebook_button.click( create_notebook_ui, inputs=[new_notebook_name, username_state], outputs=[login_status, notebook_dropdown, new_notebook_name, username_state, activity_status], ) notebook_dropdown.change( on_notebook_change, inputs=[notebook_dropdown, username_state], outputs=[chat_history, artifact_dropdown, ingest_status, activity_status, uploaded_docs_state, uploaded_docs_display], ).then( lambda: [], inputs=None, outputs=[artifact_paths_state], ) upload_button.click( ingest_upload_ui, inputs=[notebook_dropdown, file_input, uploaded_docs_state, username_state], outputs=[ingest_status, uploaded_docs_state, uploaded_docs_display], ) url_button.click( ingest_url_ui, inputs=[notebook_dropdown, url_input, uploaded_docs_state, username_state], outputs=[ingest_status, uploaded_docs_state, uploaded_docs_display], ) chat_submit.click( send_chat_ui, inputs=[notebook_dropdown, chat_input, rag_mode, chat_history, username_state], outputs=[chat_input, chat_history], ) chat_input.submit( send_chat_ui, inputs=[notebook_dropdown, chat_input, rag_mode, chat_history, username_state], outputs=[chat_input, chat_history], ) report_button.click( generate_report_ui, inputs=[notebook_dropdown, artifact_paths_state, username_state], outputs=[artifact_paths_state, artifact_dropdown, activity_status], ) quiz_button.click( generate_quiz_ui, inputs=[notebook_dropdown, artifact_paths_state, username_state], outputs=[artifact_paths_state, artifact_dropdown, activity_status], ) podcast_button.click( generate_podcast_ui, inputs=[notebook_dropdown, artifact_paths_state, username_state], outputs=[artifact_paths_state, artifact_dropdown, activity_status], ) artifact_dropdown.change( select_artifact_download, inputs=[artifact_dropdown], outputs=[artifact_download], ) export_button.click( export_notebook_ui, inputs=[notebook_dropdown, username_state], outputs=[export_download, activity_status], ) if __name__ == "__main__": demo.launch(allowed_paths=[str(DATA_ROOT)])