| """ |
| NotebookLM Clone - Gradio UI (professional dark theme). |
| Per-user isolation; sidebar + main tabs (Sources, Chat, Artifacts). |
| """ |
| import traceback |
| from pathlib import Path |
| from typing import Any, Dict, List, Tuple |
|
|
| import gradio as gr |
|
|
| from backend import artifacts as artifacts_module |
| from backend import ingestion |
| from backend import notebooks as notebooks_module |
| from backend import rag |
| from backend.auth import get_username_from_request |
| from backend.config import ( |
| MAX_CHAT_HISTORY_LOAD, |
| RETRIEVAL_STRATEGY_MMR, |
| RETRIEVAL_STRATEGY_SIMILARITY, |
| TOP_K, |
| ) |
| from backend.storage import chat_messages_path |
| from backend.utils import append_jsonl, read_jsonl, logger |
|
|
|
|
| |
| def initial_state() -> Dict[str, Any]: |
| return {"notebook_id": None, "notebooks": [], "username": None} |
|
|
|
|
| |
| def list_notebooks_for_user(request: gr.Request) -> Tuple[Dict, str, str, Any]: |
| username = get_username_from_request(request) |
| notebooks = notebooks_module.list_notebooks(username) |
| if not notebooks: |
| s = initial_state() |
| s["username"] = username |
| return s, "", "No notebooks yet. Create one.", gr.Dropdown(choices=[], value=None) |
| current = notebooks[0].get("id", "") |
| names = [n.get("name", "Untitled") for n in notebooks] |
| state = {"notebook_id": current, "notebooks": notebooks, "username": username} |
| return state, current, "Ready.", gr.Dropdown(choices=names, value=names[0] if names else None) |
|
|
|
|
| def create_notebook_handler(name: str, request: gr.Request) -> Tuple[Dict, str, str, gr.Dropdown]: |
| username = get_username_from_request(request) |
| try: |
| nb = notebooks_module.create_notebook(username, name or "Untitled Notebook") |
| notebooks = notebooks_module.list_notebooks(username) |
| names = [n.get("name", "Untitled") for n in notebooks] |
| return ( |
| {"notebook_id": nb["id"], "notebooks": notebooks, "username": username}, |
| nb["id"], |
| f"Created '{nb['name']}'.", |
| gr.Dropdown(choices=names, value=nb["name"]), |
| ) |
| except Exception as e: |
| logger.exception("create_notebook") |
| return initial_state(), "", f"Error: {e}", gr.Dropdown(choices=[], value=None) |
|
|
|
|
| def delete_notebook_handler(notebook_id: str, request: gr.Request) -> Tuple[Dict, str, str, gr.Dropdown]: |
| username = get_username_from_request(request) |
| if not notebook_id: |
| return initial_state(), "", "Select a notebook first.", gr.Dropdown(choices=[], value=None) |
| try: |
| notebooks_module.delete_notebook(username, notebook_id) |
| notebooks = notebooks_module.list_notebooks(username) |
| names = [n.get("name", "Untitled") for n in notebooks] |
| next_id = notebooks[0].get("id", "") if notebooks else "" |
| next_state = {"notebook_id": next_id, "notebooks": notebooks, "username": username} |
| return ( |
| next_state, |
| next_id, |
| "Notebook deleted.", |
| gr.Dropdown(choices=names, value=names[0] if names else None), |
| ) |
| except Exception as e: |
| logger.exception("delete_notebook") |
| return initial_state(), "", f"Error: {e}", gr.Dropdown(choices=[], value=None) |
|
|
|
|
| def rename_notebook_handler(notebook_id: str, new_name: str, request: gr.Request) -> Tuple[str, gr.Dropdown]: |
| username = get_username_from_request(request) |
| if not notebook_id: |
| return "Select a notebook first.", gr.update() |
| try: |
| nb = notebooks_module.rename_notebook(username, notebook_id, new_name) |
| notebooks = notebooks_module.list_notebooks(username) |
| names = [n.get("name", "Untitled") for n in notebooks] |
| return f"Renamed to '{nb['name']}'.", gr.Dropdown(choices=names, value=nb["name"]) |
| except Exception as e: |
| return f"Error: {e}", gr.update() |
|
|
|
|
| def switch_notebook_handler(notebook_id: str, request: gr.Request) -> Tuple[Dict, str, List, str]: |
| if not notebook_id: |
| return {"notebook_id": None, "notebooks": [], "username": None}, "", [], "" |
| username = get_username_from_request(request) |
| notebooks = notebooks_module.list_notebooks(username) |
| state = {"notebook_id": notebook_id, "notebooks": notebooks, "username": username} |
| path = chat_messages_path(username, notebook_id) |
| messages = read_jsonl(path) |
| messages = messages[-MAX_CHAT_HISTORY_LOAD:] |
| chat_ui = [] |
| for m in messages: |
| role = m.get("role", "user") |
| content = m.get("content", "") |
| chat_ui.append((content if role == "user" else None, content if role == "assistant" else None)) |
| cite_text = _last_citations_markdown(messages) |
| return state, notebook_id, chat_ui, cite_text |
|
|
|
|
| def _last_citations_markdown(messages: List[Dict]) -> str: |
| for m in reversed(messages): |
| if m.get("role") != "assistant": |
| continue |
| cites = m.get("citations", []) |
| if not cites: |
| continue |
| parts = [] |
| for i, c in enumerate(cites, 1): |
| meta = c.get("metadata", {}) |
| name = meta.get("source_name", "Source") |
| page = meta.get("page_or_slide", "") |
| snip = (c.get("document", ""))[:250] + "..." if len(c.get("document", "")) > 250 else c.get("document", "") |
| parts.append(f"**[{i}] {name}** (p.{page})\n\n{snip}") |
| return "\n\n---\n\n".join(parts) |
| return "" |
|
|
|
|
| |
| def ingest_file_handler(file, notebook_id: str, request: gr.Request) -> str: |
| if not file or not notebook_id: |
| return "Select a notebook and upload a file." |
| username = get_username_from_request(request) |
| try: |
| path = Path(file.name) if hasattr(file, "name") else Path(file) |
| ingestion.add_source_file(username, notebook_id, path, path.name) |
| return f"Ingested: {path.name}" |
| except Exception as e: |
| logger.exception("ingest_file") |
| return f"Error: {traceback.format_exc()}" |
|
|
|
|
| def ingest_url_handler(url: str, notebook_id: str, request: gr.Request) -> str: |
| if not url or not notebook_id: |
| return "Enter URL and select a notebook." |
| username = get_username_from_request(request) |
| try: |
| ingestion.add_source_url(username, notebook_id, url.strip()) |
| return f"Ingested URL: {url[:60]}..." |
| except Exception as e: |
| logger.exception("ingest_url") |
| return f"Error: {e}" |
|
|
|
|
| def refresh_sources_handler(notebook_id: str, request: gr.Request) -> Tuple[str, str, Any]: |
| if not notebook_id: |
| return "No notebook selected.", "", gr.Dropdown(choices=[], value=None) |
| username = get_username_from_request(request) |
| sources = ingestion.list_sources(username, notebook_id) |
| lines = [] |
| choices = [] |
| for s in sources: |
| name = (s.get("filename", "?") or "?")[:60] |
| en = "✓" if s.get("enabled", True) else "✗" |
| lines.append(f"- {en} {name}") |
| choices.append((f"{en} {name}", s.get("id", ""))) |
| md = "\n".join(lines) if lines else "No sources. Upload files or add a URL." |
| return md, f"{len(sources)} source(s)", gr.Dropdown(choices=choices, value=choices[0][1] if choices else None) |
|
|
|
|
| def toggle_source_handler(notebook_id: str, source_id: str, enabled: bool, request: gr.Request) -> str: |
| if not notebook_id or not source_id: |
| return "Missing notebook or source." |
| username = get_username_from_request(request) |
| try: |
| ingestion.set_source_enabled(username, notebook_id, source_id, enabled) |
| return "Updated." |
| except Exception as e: |
| return f"Error: {e}" |
|
|
|
|
| |
| def chat_send( |
| message: str, |
| history: List, |
| notebook_id: str, |
| strategy: str, |
| request: gr.Request, |
| ) -> Tuple[List, str, str, str, str]: |
| if not message or not notebook_id: |
| return history, "", "", "", "Select a notebook and type a message." |
| username = get_username_from_request(request) |
| path = chat_messages_path(username, notebook_id) |
| append_jsonl(path, {"role": "user", "content": message, "ts": _now_iso()}) |
| try: |
| answer_text, citations, ret_time, gen_time = rag.answer( |
| username, notebook_id, message, strategy=strategy, top_k=TOP_K |
| ) |
| append_jsonl( |
| path, |
| { |
| "role": "assistant", |
| "content": answer_text, |
| "ts": _now_iso(), |
| "citations": [ |
| {"document": c.get("document"), "metadata": c.get("metadata"), "id": c.get("id")} |
| for c in citations |
| ], |
| }, |
| ) |
| history = history + [(message, answer_text)] |
| cite_block = _format_citations(citations) |
| timing = f"Retrieval: {ret_time:.2f}s | Generation: {gen_time:.2f}s" |
| return history, cite_block, "", timing, "" |
| except Exception as e: |
| logger.exception("chat_send") |
| return history, "", "", "", f"Error: {traceback.format_exc()}" |
|
|
|
|
| def _format_citations(citations: List[Dict]) -> str: |
| if not citations: |
| return "" |
| parts = [] |
| for i, c in enumerate(citations, 1): |
| meta = c.get("metadata", {}) |
| name = meta.get("source_name", "Source") |
| page = meta.get("page_or_slide", "") |
| snip = (c.get("document", ""))[:300] + "..." if len(c.get("document", "")) > 300 else c.get("document", "") |
| parts.append(f"[{i}] **{name}** (p.{page})\n\n{snip}") |
| return "\n\n---\n\n".join(parts) |
|
|
|
|
| def clear_chat_handler(notebook_id: str, request: gr.Request) -> Tuple[List, str]: |
| if not notebook_id: |
| return [], "" |
| username = get_username_from_request(request) |
| path = chat_messages_path(username, notebook_id) |
| if path.exists(): |
| path.write_text("") |
| return [], "" |
|
|
|
|
| |
|
|
|
|
| def artifact_quiz_click(notebook_id: str, extra: str, strategy: str, request: gr.Request) -> Tuple[str, Any, str]: |
| if not notebook_id: |
| return "Select a notebook first.", gr.update(), "" |
| username = get_username_from_request(request) |
| try: |
| out = artifacts_module.generate_quiz(username, notebook_id, extra_instruction=extra or "", strategy=strategy) |
| if out.get("error"): |
| return out["error"], gr.update(), "" |
| arts = artifacts_module.list_artifacts(username, notebook_id) |
| quizzes = [a for a in arts if a.get("type") == "quiz"] |
| choices = [(a.get("filename", "?"), a.get("filename", "")) for a in quizzes] |
| return f"Quiz generated: {out['filename']}", gr.Dropdown(choices=choices, value=out["filename"]), out["content"] |
| except Exception as e: |
| logger.exception("quiz") |
| return f"Error: {traceback.format_exc()}", gr.update(), "" |
|
|
|
|
| def artifact_podcast_click(notebook_id: str, extra: str, strategy: str, request: gr.Request) -> Tuple[str, Any, str, Any]: |
| if not notebook_id: |
| return "Select a notebook first.", gr.update(), "", None |
| username = get_username_from_request(request) |
| try: |
| out = artifacts_module.generate_podcast(username, notebook_id, extra_instruction=extra or "", strategy=strategy) |
| if out.get("error"): |
| return out["error"], gr.update(), "", None |
| arts = artifacts_module.list_artifacts(username, notebook_id) |
| podcasts = [a for a in arts if a.get("type") == "podcast"] |
| choices = [(a.get("filename", "?"), a.get("filename", "")) for a in podcasts] |
| transcript = out.get("transcript_content", "") |
| audio_path = artifacts_module.get_podcast_audio_path(username, notebook_id, out["filename"]) if out.get("audio_ok") else None |
| return ( |
| f"Podcast generated: {out['filename']}" + ("" if out.get("audio_ok") else " (audio failed; transcript saved)"), |
| gr.Dropdown(choices=choices, value=out["filename"]), |
| transcript, |
| audio_path, |
| ) |
| except Exception as e: |
| logger.exception("podcast") |
| return f"Error: {traceback.format_exc()}", gr.update(), "", None |
|
|
|
|
| def list_artifacts_handler(notebook_id: str, request: gr.Request) -> Tuple[str, List[Dict]]: |
| if not notebook_id: |
| return "Select a notebook.", [] |
| username = get_username_from_request(request) |
| arts = artifacts_module.list_artifacts(username, notebook_id) |
| lines = [] |
| for a in arts: |
| t = a.get("type", "?") |
| f = a.get("filename", "?") |
| lines.append(f"- **{t}**: {f}") |
| return "\n".join(lines) if lines else "No artifacts yet.", arts |
|
|
|
|
| def view_report_content(notebook_id: str, filename: str, request: gr.Request) -> str: |
| if not notebook_id or not filename: |
| return "" |
| username = get_username_from_request(request) |
| return artifacts_module.get_report_content(username, notebook_id, filename) |
|
|
|
|
| def view_quiz_content(notebook_id: str, filename: str, request: gr.Request) -> str: |
| if not notebook_id or not filename: |
| return "" |
| username = get_username_from_request(request) |
| return artifacts_module.get_quiz_content(username, notebook_id, filename) |
|
|
|
|
| def view_podcast_content(notebook_id: str, filename: str, request: gr.Request) -> Tuple[str, Any]: |
| if not notebook_id or not filename: |
| return "", None |
| username = get_username_from_request(request) |
| arts = artifacts_module.list_artifacts(username, notebook_id) |
| for a in arts: |
| if a.get("type") == "podcast" and a.get("filename") == filename: |
| trans_fn = a.get("transcript_filename") |
| if trans_fn: |
| transcript = artifacts_module.get_podcast_transcript(username, notebook_id, trans_fn) |
| else: |
| transcript = "" |
| audio_path = artifacts_module.get_podcast_audio_path(username, notebook_id, filename) |
| return transcript, audio_path |
| return "", None |
|
|
|
|
| def _now_iso() -> str: |
| from datetime import datetime |
| return datetime.utcnow().isoformat() + "Z" |
|
|
|
|
| |
| CUSTOM_CSS = """ |
| /* Base: professional font and smooth rendering */ |
| .gradio-container { |
| font-family: 'Plus Jakarta Sans', -apple-system, BlinkMacSystemFont, 'Segoe UI', Roboto, sans-serif !important; |
| } |
| .gradio-container .block, .gradio-container label, .gradio-container input, |
| .gradio-container textarea, .gradio-container button, .gradio-container .markdown { |
| font-family: 'Plus Jakarta Sans', -apple-system, BlinkMacSystemFont, 'Segoe UI', Roboto, sans-serif !important; |
| } |
| .gradio-container { |
| background: #0e0e14 !important; |
| font-size: 15px !important; |
| line-height: 1.5 !important; |
| } |
| .gradio-container .main { |
| background: #12121a !important; |
| max-width: 1400px !important; |
| margin: 0 auto !important; |
| } |
| .gradio-container .block { |
| background: transparent !important; |
| border: none !important; |
| padding: 0 !important; |
| } |
| |
| /* Headings */ |
| .gradio-container h1, .gradio-container .markdown h1 { |
| font-size: 1.5rem !important; |
| font-weight: 700 !important; |
| letter-spacing: -0.02em !important; |
| color: #f0f0f5 !important; |
| margin-bottom: 0.5rem !important; |
| } |
| .gradio-container .markdown p, .gradio-container .markdown { |
| color: #a0a0b0 !important; |
| font-size: 0.9375rem !important; |
| font-weight: 400 !important; |
| } |
| |
| /* Labels: subtle, not loud */ |
| .gradio-container label, .gradio-container .label-wrap { |
| color: #9090a0 !important; |
| font-size: 0.8125rem !important; |
| font-weight: 500 !important; |
| text-transform: none !important; |
| } |
| |
| /* Inputs: clean, rounded */ |
| .gradio-container .input, .gradio-container .output, |
| .gradio-container .textbox textarea, .gradio-container input[type="text"] { |
| background: #1c1c26 !important; |
| color: #e8e8f0 !important; |
| border: 1px solid #2d2d3a !important; |
| border-radius: 10px !important; |
| padding: 10px 14px !important; |
| font-size: 0.9375rem !important; |
| transition: border-color 0.15s ease, box-shadow 0.15s ease !important; |
| } |
| .gradio-container .input:focus-within, |
| .gradio-container .textbox textarea:focus, |
| .gradio-container input[type="text"]:focus { |
| border-color: #6366f1 !important; |
| box-shadow: 0 0 0 3px rgba(99, 102, 241, 0.15) !important; |
| outline: none !important; |
| } |
| |
| /* Buttons: primary = indigo/violet, refined */ |
| .gradio-container .primary { |
| background: linear-gradient(180deg, #6366f1 0%, #4f46e5 100%) !important; |
| color: #fff !important; |
| border: none !important; |
| border-radius: 10px !important; |
| font-weight: 600 !important; |
| font-size: 0.9375rem !important; |
| padding: 10px 18px !important; |
| box-shadow: 0 1px 2px rgba(0,0,0,0.2) !important; |
| transition: transform 0.05s ease, box-shadow 0.15s ease !important; |
| } |
| .gradio-container .primary:hover { |
| background: linear-gradient(180deg, #5558e3 0%, #4338ca 100%) !important; |
| box-shadow: 0 4px 12px rgba(99, 102, 241, 0.35) !important; |
| } |
| .gradio-container button:not(.primary) { |
| background: #252532 !important; |
| color: #c0c0d0 !important; |
| border: 1px solid #2d2d3a !important; |
| border-radius: 10px !important; |
| font-weight: 500 !important; |
| } |
| .gradio-container button:not(.primary):hover { |
| background: #2d2d3a !important; |
| color: #e0e0e8 !important; |
| } |
| .gradio-container .stop { background: #2d2020 !important; color: #e8a0a0 !important; border-color: #4a3030 !important; } |
| .gradio-container .stop:hover { background: #3d2828 !important; } |
| |
| /* Sidebar: card-like sections */ |
| #sidebar { |
| background: #16161f !important; |
| border-radius: 12px !important; |
| padding: 20px !important; |
| border: 1px solid #22222e !important; |
| } |
| #sidebar .block { background: transparent !important; margin-bottom: 16px !important; } |
| #sidebar .markdown { color: #a0a0b0 !important; } |
| #user-display { |
| font-size: 0.8125rem !important; |
| color: #707080 !important; |
| padding: 8px 0 !important; |
| border-bottom: 1px solid #22222e !important; |
| margin-bottom: 12px !important; |
| } |
| |
| /* Dropdown */ |
| .gradio-container .dropdown, .gradio-container select { |
| background: #1c1c26 !important; |
| color: #e8e8f0 !important; |
| border: 1px solid #2d2d3a !important; |
| border-radius: 10px !important; |
| font-size: 0.9375rem !important; |
| } |
| |
| /* Tabs: clean pill-style */ |
| .gradio-container .tabs { |
| margin-bottom: 20px !important; |
| } |
| .gradio-container .tabs .tabitem { |
| background: transparent !important; |
| padding: 16px 0 !important; |
| } |
| .gradio-container .tabs button { |
| color: #808090 !important; |
| font-weight: 500 !important; |
| font-size: 0.9375rem !important; |
| border-radius: 8px !important; |
| padding: 8px 16px !important; |
| margin-right: 4px !important; |
| border: none !important; |
| background: transparent !important; |
| } |
| .gradio-container .tabs button.selected { |
| background: #252532 !important; |
| color: #e0e0e8 !important; |
| } |
| |
| /* Chatbot */ |
| .gradio-container .chatbot { |
| background: #1c1c26 !important; |
| border: 1px solid #2d2d3a !important; |
| border-radius: 12px !important; |
| } |
| |
| /* File upload area */ |
| .gradio-container .file-preview, .gradio-container .upload { |
| background: #1c1c26 !important; |
| border: 2px dashed #2d2d3a !important; |
| border-radius: 12px !important; |
| color: #808090 !important; |
| } |
| .gradio-container .file-preview:hover, .gradio-container .upload:hover { |
| border-color: #6366f1 !important; |
| background: #1e1e2a !important; |
| } |
| |
| /* Status / read-only text: subtle, not error-like */ |
| .gradio-container .input[data-type="textbox"] textarea:disabled, |
| .gradio-container input:disabled { |
| background: #181822 !important; |
| color: #9090a0 !important; |
| border-color: #252532 !important; |
| } |
| |
| /* Error message area: only show when needed, subtle */ |
| #chat-error-box { |
| font-size: 0.8125rem !important; |
| color: #c08080 !important; |
| background: #1e1818 !important; |
| border: 1px solid #3d2828 !important; |
| border-radius: 8px !important; |
| padding: 10px 12px !important; |
| } |
| #chat-error-box:empty, .hide-when-empty:empty { display: none !important; } |
| |
| /* Accordion */ |
| .gradio-container .accordion { |
| background: #1c1c26 !important; |
| border: 1px solid #2d2d3a !important; |
| border-radius: 10px !important; |
| } |
| """ |
|
|
|
|
| |
| def build_ui(): |
| with gr.Blocks( |
| title="NotebookLM Clone", |
| theme=gr.themes.Soft(primary_hue="violet"), |
| css=CUSTOM_CSS, |
| head=""" |
| <link rel="preconnect" href="https://fonts.googleapis.com"> |
| <link rel="preconnect" href="https://fonts.gstatic.com" crossorigin> |
| <link href="https://fonts.googleapis.com/css2?family=Plus+Jakarta+Sans:wght@400;500;600;700&display=swap" rel="stylesheet"> |
| """, |
| ) as demo: |
| state = gr.State(value=initial_state) |
| notebook_id_hidden = gr.Textbox(visible=False) |
|
|
| with gr.Row(): |
| |
| with gr.Column(scale=1, min_width=280, elem_id="sidebar"): |
| gr.Markdown("# 📓 NotebookLM Clone") |
| user_display = gr.Markdown(value="*Logged in as: …*", elem_id="user-display") |
| gr.Markdown("**Select notebook**") |
| notebook_dropdown = gr.Dropdown( |
| label="", |
| choices=[], |
| value=None, |
| interactive=True, |
| ) |
| create_name = gr.Textbox(label="New notebook name", placeholder="e.g. Research notes", scale=2) |
| create_btn = gr.Button("+ New", variant="primary", scale=1) |
| gr.Markdown("**Manage notebook**") |
| rename_name = gr.Textbox(label="Rename to", placeholder="New name", show_label=True) |
| with gr.Row(): |
| rename_btn = gr.Button("Rename") |
| delete_btn = gr.Button("Delete", variant="stop") |
| notebook_status = gr.Textbox(label="Status", interactive=False, visible=True, value="Ready.") |
| gr.Markdown("---") |
| gr.Markdown("**Ingested sources**") |
| sources_md = gr.Markdown(value="No sources yet. Use the **Sources** tab to upload or add a URL, then refresh.") |
| source_toggle_dropdown = gr.Dropdown(label="Source to toggle", choices=[], value=None) |
| with gr.Row(): |
| enable_src_btn = gr.Button("Enable", size="sm") |
| disable_src_btn = gr.Button("Disable", size="sm") |
| refresh_sources_btn = gr.Button("Refresh sources", size="sm") |
|
|
| |
| with gr.Column(scale=3): |
| with gr.Tabs() as main_tabs: |
| |
| with gr.TabItem("Sources"): |
| gr.Markdown("Upload PDF, PPTX, or TXT files, or paste a URL to add content to this notebook.") |
| file_upload = gr.File( |
| label="Upload file", |
| file_types=[".pdf", ".pptx", ".txt"], |
| ) |
| with gr.Row(): |
| url_input = gr.Textbox( |
| label="Or paste URL", |
| placeholder="https://example.com/article", |
| scale=3, |
| ) |
| ingest_url_btn = gr.Button("Ingest URL", variant="primary", scale=1) |
| ingest_btn = gr.Button("Ingest file", variant="primary") |
| ingest_status = gr.Textbox(label="", interactive=False, value="", show_label=False) |
|
|
| |
| with gr.TabItem("Chat"): |
| strategy_dropdown = gr.Dropdown( |
| label="Retrieval strategy", |
| choices=[RETRIEVAL_STRATEGY_SIMILARITY, RETRIEVAL_STRATEGY_MMR], |
| value=RETRIEVAL_STRATEGY_SIMILARITY, |
| ) |
| chat = gr.Chatbot(label="", height=420, show_label=False) |
| chat_msg = gr.Textbox( |
| label="", |
| placeholder="Ask about your sources…", |
| lines=2, |
| show_label=False, |
| ) |
| with gr.Row(): |
| send_btn = gr.Button("Send", variant="primary") |
| clear_chat_btn = gr.Button("Clear chat") |
| timing_tb = gr.Textbox(label="Response time", interactive=False, value="") |
| with gr.Accordion("Citations", open=True): |
| citations_display = gr.Markdown(value="") |
| chat_error = gr.Textbox( |
| label="", |
| interactive=False, |
| visible=True, |
| value="", |
| show_label=False, |
| elem_id="chat-error-box", |
| ) |
|
|
| |
| with gr.TabItem("Artifacts"): |
| artifact_extra = gr.Textbox( |
| label="Extra instruction (optional)", |
| placeholder="e.g. Focus on topic X and how it relates to Y", |
| ) |
| artifact_strategy = gr.Dropdown( |
| label="Retrieval strategy", |
| choices=[RETRIEVAL_STRATEGY_SIMILARITY, RETRIEVAL_STRATEGY_MMR], |
| value=RETRIEVAL_STRATEGY_SIMILARITY, |
| ) |
| with gr.Tabs() as artifact_tabs: |
| |
| with gr.TabItem("Reports"): |
| report_btn = gr.Button("Generate report", variant="primary") |
| report_status = gr.Textbox(label="", interactive=False, value="", show_label=False) |
| report_files_dropdown = gr.Dropdown( |
| label="Report files", |
| choices=[], |
| value=None, |
| ) |
| report_content_md = gr.Markdown( |
| value="*Generate a report or select one above to view.*", |
| elem_classes=["report-display"], |
| ) |
| |
| with gr.TabItem("Quizzes"): |
| quiz_btn = gr.Button("Generate quiz", variant="primary") |
| quiz_status = gr.Textbox(label="", interactive=False, value="", show_label=False) |
| quiz_files_dropdown = gr.Dropdown( |
| label="Quiz files", |
| choices=[], |
| value=None, |
| ) |
| quiz_content_md = gr.Markdown( |
| value="*Generate a quiz or select one above to view.*", |
| ) |
| |
| with gr.TabItem("Podcasts"): |
| podcast_btn = gr.Button("Generate podcast (transcript + MP3)", variant="primary") |
| podcast_status = gr.Textbox(label="", interactive=False, value="", show_label=False) |
| podcast_files_dropdown = gr.Dropdown( |
| label="Podcast files", |
| choices=[], |
| value=None, |
| ) |
| podcast_audio = gr.Audio(label="Play podcast", type="filepath") |
| podcast_transcript_md = gr.Markdown( |
| value="*Generate a podcast or select one above to view transcript.*", |
| ) |
| gr.Markdown("### Your artifacts") |
| artifacts_list_md = gr.Markdown(value="No artifacts yet. Generate a report, quiz, or podcast above.") |
| artifacts_list_btn = gr.Button("Refresh list") |
|
|
| |
| def on_load(request: gr.Request): |
| username = get_username_from_request(request) |
| state_val, nb_id, status, dd = list_notebooks_for_user(request) |
| user_md = f"**Logged in as:** {username}" |
| if nb_id: |
| _, _, chat_hist, cites = switch_notebook_handler(nb_id, request) |
| return state_val, nb_id, status, dd, chat_hist, cites, user_md |
| return state_val, nb_id, status, dd, [], "", user_md |
|
|
| demo.load( |
| fn=on_load, |
| inputs=[], |
| outputs=[state, notebook_id_hidden, notebook_status, notebook_dropdown, chat, citations_display, user_display], |
| ) |
|
|
| def on_notebook_select(choice, request: gr.Request): |
| username = get_username_from_request(request) |
| notebooks = notebooks_module.list_notebooks(username) |
| nb_id = "" |
| for n in notebooks: |
| if n.get("name") == choice: |
| nb_id = n.get("id", "") |
| break |
| return switch_notebook_handler(nb_id, request) |
|
|
| notebook_dropdown.change( |
| fn=on_notebook_select, |
| inputs=[notebook_dropdown], |
| outputs=[state, notebook_id_hidden, chat, citations_display], |
| ) |
|
|
| create_btn.click( |
| fn=create_notebook_handler, |
| inputs=[create_name], |
| outputs=[state, notebook_id_hidden, notebook_status, notebook_dropdown], |
| ) |
| delete_btn.click( |
| fn=delete_notebook_handler, |
| inputs=[notebook_id_hidden], |
| outputs=[state, notebook_id_hidden, notebook_status, notebook_dropdown], |
| ) |
| rename_btn.click( |
| fn=rename_notebook_handler, |
| inputs=[notebook_id_hidden, rename_name], |
| outputs=[notebook_status, notebook_dropdown], |
| ) |
|
|
| ingest_btn.click( |
| fn=ingest_file_handler, |
| inputs=[file_upload, notebook_id_hidden], |
| outputs=[ingest_status], |
| ) |
| ingest_url_btn.click( |
| fn=ingest_url_handler, |
| inputs=[url_input, notebook_id_hidden], |
| outputs=[ingest_status], |
| ) |
| url_input.submit( |
| fn=ingest_url_handler, |
| inputs=[url_input, notebook_id_hidden], |
| outputs=[ingest_status], |
| ) |
| refresh_sources_btn.click( |
| fn=refresh_sources_handler, |
| inputs=[notebook_id_hidden], |
| outputs=[sources_md, notebook_status, source_toggle_dropdown], |
| ) |
|
|
| def do_enable(nid, sid, request: gr.Request): |
| if not sid: |
| return "Select a source." |
| toggle_source_handler(nid, sid, True, request) |
| return "Enabled." |
|
|
| def do_disable(nid, sid, request: gr.Request): |
| if not sid: |
| return "Select a source." |
| toggle_source_handler(nid, sid, False, request) |
| return "Disabled." |
|
|
| enable_src_btn.click( |
| fn=do_enable, |
| inputs=[notebook_id_hidden, source_toggle_dropdown], |
| outputs=[notebook_status], |
| ) |
| disable_src_btn.click( |
| fn=do_disable, |
| inputs=[notebook_id_hidden, source_toggle_dropdown], |
| outputs=[notebook_status], |
| ) |
|
|
| def do_send(msg, hist, nid, strat, request: gr.Request): |
| return chat_send(msg, hist, nid, strat, request) |
|
|
| send_btn.click( |
| fn=do_send, |
| inputs=[chat_msg, chat, notebook_id_hidden, strategy_dropdown], |
| outputs=[chat, citations_display, chat_msg, timing_tb, chat_error], |
| ) |
| chat_msg.submit( |
| fn=do_send, |
| inputs=[chat_msg, chat, notebook_id_hidden, strategy_dropdown], |
| outputs=[chat, citations_display, chat_msg, timing_tb, chat_error], |
| ) |
| clear_chat_btn.click( |
| fn=clear_chat_handler, |
| inputs=[notebook_id_hidden], |
| outputs=[chat, citations_display], |
| ) |
|
|
| |
| def do_report(nid, ex, strat, state: Dict): |
| username = (state or {}).get("username") or "anonymous" |
| try: |
| out = artifacts_module.generate_report(username, nid, extra_instruction=ex or "", strategy=strat) |
| if out.get("error"): |
| return out["error"], gr.update(), "" |
| arts = artifacts_module.list_artifacts(username, nid) |
| reports = [a for a in arts if a.get("type") == "report"] |
| choices = [(a.get("filename", "?"), a.get("filename", "")) for a in reports] |
| timing = out.get("generation_time") |
| status = f"Report generated: {out['filename']}" + (f" ({timing:.1f}s)" if timing is not None else "") |
| return status, gr.Dropdown(choices=choices, value=out["filename"]), out["content"] |
| except Exception as e: |
| return f"Error: {traceback.format_exc()}", gr.update(), "" |
|
|
| report_btn.click( |
| fn=do_report, |
| inputs=[notebook_id_hidden, artifact_extra, artifact_strategy, state], |
| outputs=[report_status, report_files_dropdown, report_content_md], |
| ) |
| def view_report_with_req(nid, fname, request: gr.Request): |
| return view_report_content(nid, fname, request) |
|
|
| report_files_dropdown.change( |
| fn=view_report_with_req, |
| inputs=[notebook_id_hidden, report_files_dropdown], |
| outputs=[report_content_md], |
| ) |
|
|
| def do_quiz(nid, ex, strat, state: Dict): |
| username = (state or {}).get("username") or "anonymous" |
| try: |
| out = artifacts_module.generate_quiz(username, nid, extra_instruction=ex or "", strategy=strat) |
| if out.get("error"): |
| return out["error"], gr.update(), "" |
| arts = artifacts_module.list_artifacts(username, nid) |
| quizzes = [a for a in arts if a.get("type") == "quiz"] |
| choices = [(a.get("filename", "?"), a.get("filename", "")) for a in quizzes] |
| timing = out.get("generation_time") |
| status = f"Quiz generated: {out['filename']}" + (f" ({timing:.1f}s)" if timing is not None else "") |
| return status, gr.Dropdown(choices=choices, value=out["filename"]), out["content"] |
| except Exception as e: |
| return f"Error: {traceback.format_exc()}", gr.update(), "" |
|
|
| quiz_btn.click( |
| fn=do_quiz, |
| inputs=[notebook_id_hidden, artifact_extra, artifact_strategy, state], |
| outputs=[quiz_status, quiz_files_dropdown, quiz_content_md], |
| ) |
| def view_quiz_with_req(nid, fname, request: gr.Request): |
| return view_quiz_content(nid, fname, request) |
|
|
| quiz_files_dropdown.change( |
| fn=view_quiz_with_req, |
| inputs=[notebook_id_hidden, quiz_files_dropdown], |
| outputs=[quiz_content_md], |
| ) |
|
|
| def do_podcast(nid, ex, strat, state: Dict): |
| username = (state or {}).get("username") or "anonymous" |
| try: |
| out = artifacts_module.generate_podcast(username, nid, extra_instruction=ex or "", strategy=strat) |
| if out.get("error"): |
| return out["error"], gr.update(), "", None |
| arts = artifacts_module.list_artifacts(username, nid) |
| podcasts = [a for a in arts if a.get("type") == "podcast"] |
| choices = [(a.get("filename", "?"), a.get("filename", "")) for a in podcasts] |
| transcript = out.get("transcript_content", "") |
| audio_path = artifacts_module.get_podcast_audio_path(username, nid, out["filename"]) if out.get("audio_ok") else None |
| timing = out.get("generation_time") |
| msg = f"Podcast generated: {out['filename']}" + (f" ({timing:.1f}s)" if timing is not None else "") |
| if not out.get("audio_ok"): |
| msg += " (audio failed; transcript saved)" |
| return msg, gr.Dropdown(choices=choices, value=out["filename"]), transcript, audio_path |
| except Exception as e: |
| return f"Error: {traceback.format_exc()}", gr.update(), "", None |
|
|
| podcast_btn.click( |
| fn=do_podcast, |
| inputs=[notebook_id_hidden, artifact_extra, artifact_strategy, state], |
| outputs=[podcast_status, podcast_files_dropdown, podcast_transcript_md, podcast_audio], |
| ) |
| def view_podcast_with_req(nid, fname, request: gr.Request): |
| return view_podcast_content(nid, fname, request) |
|
|
| podcast_files_dropdown.change( |
| fn=view_podcast_with_req, |
| inputs=[notebook_id_hidden, podcast_files_dropdown], |
| outputs=[podcast_transcript_md, podcast_audio], |
| ) |
|
|
| def refresh_artifacts(nid, request: gr.Request): |
| md, _ = list_artifacts_handler(nid, request) |
| return md |
|
|
| artifacts_list_btn.click( |
| fn=refresh_artifacts, |
| inputs=[notebook_id_hidden], |
| outputs=[artifacts_list_md], |
| ) |
|
|
| return demo |
|
|
|
|
| |
| demo = build_ui() |
|
|
| if __name__ == "__main__": |
| import os |
| if not os.environ.get("MOCK_USER"): |
| os.environ.setdefault("MOCK_USER", "localuser") |
| demo.launch(server_name="0.0.0.0", server_port=7860) |
|
|