from pathlib import Path import shutil import sys import warnings # Flush print immediately def _log(msg): print(msg, flush=True) _log("1. Loading env...") # Suppress noisy dependency warnings warnings.filterwarnings("ignore", message=".*urllib3.*") warnings.filterwarnings("ignore", message=".*chardet.*") from dotenv import load_dotenv # Load .env from project root (parent of NotebookLM-Clone) so HF_TOKEN etc. are available load_dotenv(Path(__file__).resolve().parent.parent / ".env") load_dotenv(Path(__file__).resolve().parent / ".env") _log("2. Loading Gradio...") from datetime import datetime import gradio as gr _log("2a. Loading gradio_client...") import gradio_client.utils as gradio_client_utils _log("3. Loading backend...") from backend.ingestion_service import ingest_pdf_chunks, ingest_url_chunks, remove_chunks_for_source from backend.notebook_service import create_notebook, list_notebooks, rename_notebook, delete_notebook from backend.podcast_service import generate_podcast, generate_podcast_audio from backend.chat_service import load_chat from backend.rag_service import rag_chat from backend.report_service import generate_report import hashlib _log("4. Imports done.") _original_gradio_get_type = gradio_client_utils.get_type _original_json_schema_to_python_type = gradio_client_utils._json_schema_to_python_type def _patched_gradio_get_type(schema): if isinstance(schema, bool): return "Any" return _original_gradio_get_type(schema) def _patched_json_schema_to_python_type(schema, defs=None): if isinstance(schema, bool): return "Any" return _original_json_schema_to_python_type(schema, defs) gradio_client_utils.get_type = _patched_gradio_get_type gradio_client_utils._json_schema_to_python_type = _patched_json_schema_to_python_type # Theme: adapts to light/dark mode (use default font to avoid network fetch on startup) theme = gr.themes.Soft( primary_hue="blue", secondary_hue="slate", ) CUSTOM_CSS = """ .gradio-container { max-width: 1000px !important; margin: 0 auto !important; } .container { max-width: 1000px; margin: 0 auto; padding: 0 24px; } .header-bar { padding: 12px 0; border-bottom: 1px solid #e2e8f0; margin-bottom: 24px; display: flex !important; justify-content: space-between !important; align-items: center !important; white-space: nowrap; } .login-center { display: flex; justify-content: center; width: 100%; } #auth-text { white-space: nowrap; margin: 8px 0 16px 0; font-size: 0.95rem; opacity: 0.9; } .gr-button { padding: 14px 28px !important; font-size: 0.9rem !important; border-radius: 12px !important; white-space: nowrap !important; width: auto !important; } .gr-button[aria-label*="Logout"] { min-width: auto !important; display: inline-flex !important; align-items: center !important; justify-content: center !important; } .header-bar .gr-button { padding-left: 28px !important; padding-right: 28px !important; min-width: 220px !important; font-size: 0.9rem !important; } #login-btn, #login-btn.gr-button, #login-btn button, #login-btn .gr-button { display: inline-flex !important; flex-direction: row !important; align-items: center !important; justify-content: center !important; gap: 8px !important; width: auto !important; max-width: 100% !important; min-width: 220px !important; overflow: hidden !important; } #login-btn p, #login-btn span, #login-btn .md, #login-btn .md p { margin: 0 !important; font-size: 0.95rem !important; line-height: 1.2 !important; white-space: nowrap !important; overflow: hidden !important; text-overflow: ellipsis !important; } .dark .header-bar { border-bottom: 1px solid #334155; } .hero-section { margin-bottom: 16px; } .login-container { padding: 12px 0; } .create-strip { padding: 18px; border-radius: 16px; } .create-row { display: flex !important; align-items: center !important; gap: 16px !important; } .create-label { white-space: nowrap; font-size: 0.95rem; margin: 0; min-width: 180px; } .create-row .gr-textbox { flex: 1 !important; } .create-row .gr-textbox textarea, .create-row .gr-textbox input { border-radius: 10px !important; } .create-row .gr-button { border-radius: 10px !important; padding: 10px 20px !important; } .hero-title { font-size: 2rem; font-weight: 700; color: #1e293b; margin: 0 0 8px 0; } .hero-sub { font-size: 1rem; color: #64748b; margin: 0; line-height: 1.5; } .section-card { padding: 24px; border-radius: 16px; background: #f8fafc; margin: 24px; box-shadow: 0 2px 8px rgba(0,0,0,0.06); } .notebook-card { padding: 14px 20px; border-radius: 12px; background: #fff; margin: 8px 0 !important; border: 1px solid #e2e8f0; display: flex; align-items: center; gap: 12px; transition: background 0.15s ease; width: 100% !important; box-sizing: border-box !important; overflow: hidden; } .notebook-card:hover { background: #f8fafc; } .notebook-selected { border: 2px solid #3b82f6 !important; box-shadow: none !important; } .manager-card { border-top: 4px solid #3b82f6; } .sources-card { border-top: 4px solid #14b8a6; } .chat-card { border-top: 4px solid #8b5cf6; } .artifacts-card { border-top: 4px solid #f97316; } .section-title { font-size: 1.125rem; font-weight: 600; color: #1e293b; margin: 0 0 16px 0; } .section-row { display: flex !important; align-items: center !important; gap: 16px !important; margin-bottom: 12px; } .section-row .gr-textbox { flex: 1 !important; } .section-row .gr-button { border-radius: 10px !important; padding: 10px 20px !important; } .status { font-size: 0.875rem; color: #64748b; margin: 16px; padding: 12px 16px; background: #f1f5f9; border-radius: 12px; } @media (prefers-color-scheme: dark) { .hero-title { color: #f1f5f9 !important; } .hero-sub { color: #94a3b8 !important; } .section-card { background: #1e293b !important; box-shadow: 0 2px 8px rgba(0,0,0,0.3); } .section-title { color: #f1f5f9 !important; } .notebook-card { background: #334155 !important; border-color: #475569; } .notebook-card:hover { background: #475569 !important; } .notebook-selected { border: 2px solid #60a5fa !important; box-shadow: none !important; } .manager-card { border-top-color: #60a5fa; } .sources-card { border-top-color: #2dd4bf; } .chat-card { border-top-color: #a78bfa; } .artifacts-card { border-top-color: #fb923c; } .status { color: #94a3b8 !important; background: #334155 !important; } } .dark .hero-title { color: #f1f5f9 !important; } .dark .hero-sub { color: #94a3b8 !important; } .dark .section-card { background: #1e293b !important; } .dark .section-title { color: #f1f5f9 !important; } .dark .notebook-card { background: #334155 !important; border-color: #475569; } .dark .notebook-card:hover { background: #475569 !important; } .dark .status { color: #94a3b8 !important; background: #334155 !important; } #delete-btn { border-radius: 16px; background: #FF0000; } #rename-btn { border-radius: 16px; background: #008000; } #select-btn { border-radius: 16px; } #ingest-url-btn { border-radius: 16px; } """ def _user_id(profile: gr.OAuthProfile | None) -> str | None: """Extract user_id from HF OAuth profile. None if not logged in.""" if not profile: return None return ( getattr(profile, "id", None) or getattr(profile, "sub", None) or getattr(profile, "preferred_username", None) or getattr(profile, "username", None) or getattr(profile, "name", None) ) def _get_notebooks(user_id: str | None): if not user_id: return [] return list_notebooks(user_id) def _safe_create(new_name, state, selected_id, profile: gr.OAuthProfile | None = None): """Create notebook with name from text box.""" try: user_id = _user_id(profile) if not user_id: return gr.skip(), gr.skip(), gr.skip(), "Please sign in with Hugging Face" name = (new_name or "").strip() or "Untitled Notebook" nb = create_notebook(user_id, name) if nb: notebooks = _get_notebooks(user_id) new_state = [(n["notebook_id"], n["name"]) for n in notebooks] status = f"Created: {nb['name']}" return "", new_state, nb["notebook_id"], status return gr.skip(), gr.skip(), gr.skip(), "Failed to create" except Exception as e: return gr.skip(), gr.skip(), gr.skip(), f"Error: {e}" def _safe_rename(idx, new_name, state, selected_id, profile: gr.OAuthProfile | None = None): """Rename notebook at index.""" try: if idx is None or idx < 0 or idx >= len(state): return gr.skip(), gr.skip(), "Invalid selection" nb_id, _ = state[idx] name = (new_name or "").strip() if not name: return gr.skip(), gr.skip(), "Enter a name." user_id = _user_id(profile) if not user_id: return gr.skip(), gr.skip(), "Please sign in" ok = rename_notebook(user_id, nb_id, name) if ok: notebooks = _get_notebooks(user_id) new_state = [(n["notebook_id"], n["name"]) for n in notebooks] return new_state, selected_id, f"Renamed to: {name}" return gr.skip(), gr.skip(), "Failed to rename" except Exception as e: return gr.skip(), gr.skip(), f"Error: {e}" def _safe_delete(idx, state, selected_id, profile: gr.OAuthProfile | None = None): """Delete notebook at index.""" try: if idx is None or idx < 0 or idx >= len(state): return gr.skip(), gr.skip(), "Invalid selection" nb_id, _ = state[idx] user_id = _user_id(profile) if not user_id: return gr.skip(), gr.skip(), "Please sign in" ok = delete_notebook(user_id, nb_id) if ok: notebooks = _get_notebooks(user_id) new_state = [(n["notebook_id"], n["name"]) for n in notebooks] new_selected = notebooks[0]["notebook_id"] if notebooks else None return new_state, new_selected, "Notebook deleted" return gr.skip(), gr.skip(), "Failed to delete" except Exception as e: return gr.skip(), gr.skip(), f"Error: {e}" def _initial_load(profile: gr.OAuthProfile | None = None): """Load notebooks on app load. Uses HF OAuth profile for user_id.""" user_id = _user_id(profile) notebooks = _get_notebooks(user_id) state = [(n["notebook_id"], n["name"]) for n in notebooks] selected = notebooks[0]["notebook_id"] if notebooks else None status = f"Signed in as {user_id}" if user_id else "Sign in with Hugging Face to manage notebooks." auth_update = f"You are logged in as {getattr(profile, 'name', None) or user_id} ({_user_id(profile)})" if user_id else "" auth_row_visible = bool(user_id) source_status = "" if user_id else "Sign in with Hugging Face to upload context material." notebook_status_update = gr.update( value="Sign in with Hugging Face to manage notebooks." if not user_id else "", visible=not bool(user_id), ) return ( state, selected, notebook_status_update, auth_update, gr.update(visible=auth_row_visible), gr.update(visible=bool(user_id)), gr.update(visible=not bool(user_id)), source_status, user_id, ) def _selected_notebook_text(selected_id, state) -> str: if not selected_id: return "**Selected notebook:** None" name_map = {str(notebook_id): name for notebook_id, name in (state or [])} name = name_map.get(str(selected_id)) if name: return f"**Selected notebook:** {name}" return "**Selected notebook:** Unknown" REPORT_SCOPE_LABELS = { "All sources (PDFs, URLs, text)": "all", "PDF uploads only": "pdf", "Web URLs only": "url", "Uploaded text only": "text", } REPORT_SCOPE_DESCRIPTIONS = { "all": "PDFs, URLs, and uploaded text", "pdf": "uploaded PDFs", "url": "ingested web URLs", "text": "uploaded text files", } DEFAULT_REPORT_SCOPE_LABEL = "All sources (PDFs, URLs, text)" def _resolve_report_scope(label: str) -> tuple[str, str]: value = REPORT_SCOPE_LABELS.get(label, "all") desc = REPORT_SCOPE_DESCRIPTIONS.get(value, "selected sources") return value, desc def _generate_report(scope_label, notebook_id, profile: gr.OAuthProfile | None): scope_value, scope_desc = _resolve_report_scope(scope_label) user_id = _user_id(profile) if not user_id: return "Please sign in with Hugging Face before generating a report.", "" if not notebook_id: return "Select a notebook first to generate a report.", "" try: report_text = generate_report(notebook_id, scope_value) status = f"Report ready for {scope_desc}." return status, report_text except ValueError as error: return f"⚠️ {error}", "" except Exception as error: return f"Error generating report: {error}", "" def _safe_upload_pdfs(files, selected_id, profile: gr.OAuthProfile | None): """Upload PDF files for the selected notebook.""" try: user_id = _user_id(profile) if not user_id: return "Please sign in with Hugging Face before uploading PDFs." if not selected_id: return "Select a notebook first, then upload PDFs." if not files: return "Choose at least one PDF to upload." if isinstance(files, str): file_paths = [files] else: file_paths = [] for file_item in files: file_path = getattr(file_item, "name", file_item) if file_path: file_paths.append(file_path) if not file_paths: return "No files were received. Try uploading again." target_dir = Path("data") / "uploads" / user_id / str(selected_id) target_dir.mkdir(parents=True, exist_ok=True) uploaded = [] total_chunks = 0 for file_path in file_paths: source_path = Path(file_path) if source_path.suffix.lower() != ".pdf": continue destination = target_dir / source_path.name if destination.exists(): index = 1 while True: candidate = target_dir / f"{source_path.stem}_{index}{source_path.suffix}" if not candidate.exists(): destination = candidate break index += 1 shutil.copy2(source_path, destination) uploaded.append(destination.name) total_chunks += ingest_pdf_chunks(str(selected_id), destination.name, destination) if not uploaded: return "Only .pdf files are allowed." return f"Uploaded {len(uploaded)} PDF(s): {', '.join(uploaded)}. Indexed {total_chunks} chunk(s) for RAG." except Exception as error: return f"Error uploading PDFs: {error}" def _list_uploaded_pdfs(selected_id, profile: gr.OAuthProfile | None = None): """List uploaded PDFs for the selected notebook.""" user_id = _user_id(profile) if not user_id or not selected_id: return gr.update(choices=[], value=None) target_dir = Path("data") / "uploads" / user_id / str(selected_id) if not target_dir.exists(): return gr.update(choices=[], value=None) pdf_names = sorted([path.name for path in target_dir.glob("*.pdf")]) selected_name = pdf_names[0] if pdf_names else None return gr.update(choices=pdf_names, value=selected_name) def _safe_remove_pdf(file_name, selected_id, profile: gr.OAuthProfile | None = None): """Remove one uploaded PDF from the selected notebook.""" try: user_id = _user_id(profile) if not user_id: return "Please sign in with Hugging Face before removing PDFs." if not selected_id: return "Select a notebook first." if not file_name: return "Select a PDF to remove." safe_name = Path(file_name).name target_file = Path("data") / "uploads" / user_id / str(selected_id) / safe_name if not target_file.exists() or target_file.suffix.lower() != ".pdf": return "Selected PDF was not found." target_file.unlink() remove_chunks_for_source(str(selected_id), safe_name) return f"Removed PDF: {safe_name}" except Exception as error: return f"Error removing PDF: {error}" def _url_source_id(url: str) -> str: """Stable source_id so re-ingesting the same URL overwrites old chunks.""" h = hashlib.sha256(url.encode("utf-8", errors="ignore")).hexdigest()[:16] return f"url_{h}" def _safe_ingest_url(url, selected_id, profile: gr.OAuthProfile | None = None): """Ingest one URL into chunks table for the selected notebook.""" try: user_id = _user_id(profile) if not user_id: return "", "Please sign in with Hugging Face before ingesting a URL." if not selected_id: return "", "Select a notebook first, then ingest a URL." cleaned = (url or "").strip() if not cleaned: return "", "Enter a URL." if not (cleaned.startswith("http://") or cleaned.startswith("https://")): return "", "URL must start with http:// or https://" source_id = _url_source_id(cleaned) chunk_count = ingest_url_chunks(str(selected_id), source_id, cleaned) if chunk_count == 0: return "", ( "Ingested URL but extracted 0 chunks. Page may be JS-rendered/blocked/non-text. " "Try a simpler static page (example.com / Wikipedia)." ) return "", f"Ingested URL. Indexed {chunk_count} chunk(s). Source: {cleaned}" except Exception as error: return "", f"Error ingesting URL: {error}" def _safe_remove_url(url, selected_id, profile: gr.OAuthProfile | None = None): try: user_id = _user_id(profile) if not user_id: return "", "Please sign in with Hugging Face before ingesting a URL." if not selected_id: return "", "Select a notebook first, then remove a URL." cleaned = (url or "").strip() if not cleaned: return "", "Enter a URL." if not (cleaned.startswith("http://") or cleaned.startswith("https://")): return "", "URL must start with http:// or https://" source_id = _url_source_id(cleaned) remove_chunks_for_source(str(selected_id), source_id) return "", f"Removed URL: {cleaned}" except Exception as error: return "", f"Error removing URL: {error}" # ── Upload Handler Functions ────────────────────────────────── def _do_upload(text_content, title, notebook_id, profile: gr.OAuthProfile | None): """Handle direct text input and ingestion.""" from backend.ingestion_txt import ingest_txt user_id = _user_id(profile) if not user_id: return "Please sign in first." if not notebook_id: return "Please select a notebook first." if not text_content or not text_content.strip(): return "No text entered." try: filename = (title or "").strip() if not filename: filename = f"text_{datetime.now().strftime('%Y%m%d_%H%M%S')}" if not filename.endswith(".txt"): filename = filename + ".txt" file_bytes = text_content.encode("utf-8") result = ingest_txt( file_bytes=file_bytes, filename=filename, notebook_id=notebook_id, user_id=user_id ) meta = result["metadata"] return ( f" **{result['filename']}** saved successfully!\n\n" f"- Size: {meta['size_bytes'] / 1024:.1f} KB" ) except ValueError as e: return f" {str(e)}" except Exception as e: return f"Unexpected error: {str(e)}" def _format_sources(sources: list[dict]) -> str: if not sources: return "No sources yet." lines = ["| Filename | Type | Status | Words |", "|----------|------|--------|-------|"] for s in sources: meta = s.get("metadata") or {} words = meta.get("word_count", "—") lines.append(f"| {s['filename']} | {s['file_type']} | {s['status']} | {words} |") return "\n".join(lines) def _load_sources(notebook_id, profile: gr.OAuthProfile | None): from backend.ingestion_txt import list_sources if not notebook_id: return "" sources = list_sources(notebook_id) return _format_sources(sources) def _safe_generate_podcast(notebook_id, profile: gr.OAuthProfile | None = None): user_id = _user_id(profile) if not user_id: return "Please sign in first.", "" if not notebook_id: return "Please select a notebook first.", "" try: result = generate_podcast(notebook_id=str(notebook_id), user_id=user_id) status = ( f"Podcast generated. Artifact: {result['artifact_id'] or 'saved'} | " f"Sources: {result['sources_count']} | Chunks: {result['chunks_used']}" ) return status, result["script"] except Exception as error: return f"Error generating podcast: {error}", "" def _safe_generate_podcast_audio(notebook_id, script, profile: gr.OAuthProfile | None = None): user_id = _user_id(profile) if not user_id: return "Please sign in first.", None if not notebook_id: return "Please select a notebook first.", None if not script or not script.strip(): return "Generate a podcast script first.", None try: result = generate_podcast_audio(notebook_id=str(notebook_id), user_id=user_id, script=script) status = f"Podcast audio generated. Artifact: {result['artifact_id'] or 'saved'}" return status, result["audio_path"] except Exception as error: return f"Error generating podcast audio: {error}", None # Quiz Handlers def _get_notebook_pdfs(notebook_id, profile: gr.OAuthProfile | None): user_id = _user_id(profile) if not user_id or not notebook_id: return gr.update(choices=[], value=None, visible=False) target_dir = Path("data") / "uploads" / user_id / str(notebook_id) if not target_dir.exists(): return gr.update(choices=[], value=None, visible=False) pdfs = sorted([p.name for p in target_dir.glob("*.pdf")]) return gr.update(choices=pdfs, value=pdfs[0] if pdfs else None, visible=True) def _generate_quiz(notebook_id, source_type, pdf_source_id, profile: gr.OAuthProfile | None): from backend.quiz_service import generate_quiz user_id = _user_id(profile) if not user_id: return "Please sign in first.", [], *([gr.update(visible=False)] * 5 * 4), gr.update(visible=False), "" if not notebook_id: return "Please select a notebook first.", [], *([gr.update(visible=False)] * 5 * 4), gr.update(visible=False), "" type_map = {"Text": "txt", "PDF": "pdf", "URL": "url", "All": "all"} source_type_key = type_map.get(source_type, "all") if source_type_key == "pdf" and not pdf_source_id: return "Pick a PDF first.", [], *([gr.update(visible=False)] * 5 * 4), gr.update(visible=False), "" try: result = generate_quiz(notebook_id, source_type=source_type_key, source_id=pdf_source_id) questions = result["questions"] updates = [] for i in range(5): if i < len(questions): q = questions[i] q_label = f"**Q{i+1}. {q['question']}**" if q["type"] == "multiple_choice": updates += [gr.update(visible=True), gr.update(value=q_label), gr.update(choices=q["options"], value=None, visible=True), gr.update(value="", visible=False)] elif q["type"] == "true_false": updates += [gr.update(visible=True), gr.update(value=q_label), gr.update(choices=["True", "False"], value=None, visible=True), gr.update(value="", visible=False)] else: # change this line for short_answer: updates += [gr.update(visible=True), gr.update(value=q_label), gr.update(choices=[], value=None, visible=False), gr.update(value="", visible=True)] else: updates += [gr.update(visible=False), gr.update(value=""), gr.update(choices=[], value=None, visible=False), gr.update(value="", visible=False)] return "Quiz generated!", questions, *updates, gr.update(visible=True), "" except Exception as e: return f" {e}", [], *([gr.update(visible=False)] * 5 * 4), gr.update(visible=False), "" def _submit_quiz(questions, *answers): if not questions: return " No quiz loaded." score = 0 lines = [] for i, q in enumerate(questions): radio_ans = answers[i] or "" text_ans = answers[i + 5] or "" user_ans = text_ans.strip() if q["type"] == "short_answer" else radio_ans.strip() correct = q["answer"].strip() if not user_ans: is_correct = False elif q["type"] == "multiple_choice": user_letter = user_ans.split(".")[0].strip().upper() correct_letter = correct[0].upper() is_correct = user_letter == correct_letter elif q["type"] == "true_false": is_correct = user_ans.lower() == correct.lower() else: is_correct = user_ans.lower() in correct.lower() or correct.lower() in user_ans.lower() if is_correct: score += 1 lines.append(f"✅ **Q{i+1}**: Correct! *(Answer: {correct})*") else: lines.append(f"❌ **Q{i+1}**: Incorrect. *(Your answer: {user_ans or 'blank'} | Correct: {correct})*") lines.append(f"\n**Score: {score}/{len(questions)}**") return "\n\n".join(lines) def _chat_history_to_pairs(messages: list[dict]) -> list[tuple[str, str]]: """Convert load_chat output to Gradio Chatbot format [(user, assistant), ...].""" pairs = [] i = 0 while i < len(messages): m = messages[i] if m["role"] == "user": user_content = m["content"] or "" asst_content = "" if i + 1 < len(messages) and messages[i + 1]["role"] == "assistant": asst_content = messages[i + 1]["content"] or "" i += 1 pairs.append((user_content, asst_content)) i += 1 return pairs def _load_chat_history(notebook_id, user_id: str | None) -> tuple[list[tuple[str, str]], list[tuple[str, str]]]: """Load chat for notebook. Returns (history_pairs, history_pairs) for State and Chatbot. Only loads if notebook belongs to user (ownership validation). """ if not notebook_id: return [], [] messages = load_chat(notebook_id, user_id) pairs = _chat_history_to_pairs(messages) return pairs, pairs def _format_citations(chunks: list[dict]) -> str: """Format retrieved chunks for citation display.""" if not chunks: return "" lines = ["**Sources cited:**", ""] for i, c in enumerate(chunks, 1): meta = c.get("metadata") or {} source_label = meta.get("url") or meta.get("file_name") or meta.get("file_path") or "Source" content = (c.get("content") or "")[:300] if len(c.get("content") or "") > 300: content += "..." content = content.replace("\n", " ") lines.append(f"**[{i}]** *{source_label}*") lines.append(f"> {content}") lines.append("") return "\n".join(lines) def _on_chat_submit(query, notebook_id, chat_history, profile: gr.OAuthProfile | None): """Handle chat submit: call RAG, return updated history and citations.""" if not notebook_id: return "", chat_history, "Select a notebook first.", "", gr.update(visible=False) if not query or not query.strip(): return "", chat_history, "Enter a message.", "", gr.update(visible=False) user_id = _user_id(profile) if not user_id: return "", chat_history, "Please sign in first.", "", gr.update(visible=False) try: answer, updated, chunks = rag_chat(notebook_id, query.strip(), chat_history, user_id=user_id) citations_md = _format_citations(chunks) accordion_update = gr.update(visible=True) if chunks else gr.update(visible=False) return "", updated, "", citations_md, accordion_update except Exception as e: return "", chat_history, f"Error: {e}", "", gr.update(visible=False) def _get_quiz_pdfs(source_type, notebook_id): if source_type != "PDF": return gr.update(visible=False, choices=[], value=None) if not notebook_id: return gr.update(visible=False, choices=[], value=None) # Search across all users for this notebook_id base = Path("data") / "uploads" pdfs = [] if base.exists(): for user_dir in base.iterdir(): nb_dir = user_dir / str(notebook_id) if nb_dir.exists(): pdfs = sorted([p.name for p in nb_dir.glob("*.pdf")]) break print(f"DEBUG quiz pdfs found: {pdfs}") return gr.update(visible=True, choices=pdfs, value=pdfs[0] if pdfs else None) def _quiz_pdf_dropdown_update(source_type, notebook_id, profile: gr.OAuthProfile | None): if source_type != "PDF": return gr.update(visible=False, choices=[], value=None) if not notebook_id: return gr.update(visible=True, choices=[], value=None) user_id = _user_id(profile) # Try with user_id first (production) if user_id: target_dir = Path("data") / "uploads" / user_id / str(notebook_id) if target_dir.exists(): pdfs = sorted([p.name for p in target_dir.glob("*.pdf")]) return gr.update(visible=True, choices=pdfs, value=pdfs[0] if pdfs else None) # Fallback for local dev (no OAuth): scan all user folders base = Path("data") / "uploads" if base.exists(): for user_dir in base.iterdir(): if not user_dir.is_dir(): continue nb_dir = user_dir / str(notebook_id) if nb_dir.exists(): pdfs = sorted([p.name for p in nb_dir.glob("*.pdf")]) print(f"DEBUG (local fallback): notebook_id={notebook_id}, pdfs={pdfs}") return gr.update(visible=True, choices=pdfs, value=pdfs[0] if pdfs else None) return gr.update(visible=True, choices=[], value=None) def _generate_btn_update(source_type, pdf_name): if source_type == "PDF": return gr.update(interactive=bool(pdf_name)) return gr.update(interactive=True) with gr.Blocks( title="NotebookLM Clone - Notebooks", theme=theme, css=CUSTOM_CSS, ) as demo: with gr.Row(elem_classes=["header-bar"]): gr.Markdown("### 📓 NotebookLM Clone") login_btn = gr.LoginButton(value="Login with Hugging Face", size="lg", elem_id="login-btn") with gr.Row(visible=False) as auth_info_row: auth_text = gr.Markdown("", elem_id="auth-text") gr.HTML("""

📓 NotebookLM Clone

Chat with your documents. Generate reports, quizzes, and podcasts.

""") with gr.Column(visible=False, elem_classes=["login-container"]) as login_container: gr.Markdown("**Sign in with Hugging Face to access your notebooks.**", elem_classes=["login-center"]) with gr.Column(visible=False) as app_content: nb_state = gr.State([]) selected_notebook_id = gr.State(None) chat_history_state = gr.State([]) quiz_state = gr.State([]) user_id_state = gr.State(None) with gr.Group(elem_classes=["section-card", "manager-card"]): gr.Markdown("**Notebook Manager**", elem_classes=["section-title"]) selected_notebook_md = gr.Markdown("**Selected notebook:** None", elem_classes=["status"]) with gr.Group(elem_classes=["create-strip"]): with gr.Row(elem_classes=["create-row"]): gr.Markdown("Create new notebook", elem_classes=["create-label"]) create_txt = gr.Textbox( placeholder="Enter new notebook name", show_label=False, container=False, value="", ) create_btn = gr.Button("Create", variant="primary", size="sm") notebook_status = gr.Markdown("", elem_classes=["status"], visible=False) @gr.render(inputs=[nb_state, selected_notebook_id]) def render_notebooks(state, selected_id): if not state: gr.Markdown("No notebooks yet. Create one to get started.") else: for i, (nb_id, name) in enumerate(state): idx = i is_selected = str(nb_id) == str(selected_id) row_class = ["notebook-card", "notebook-selected"] if is_selected else ["notebook-card"] with gr.Row(elem_classes=row_class): name_txt = gr.Textbox( value=name, show_label=False, scale=4, min_width=240, key=f"nb-name-{nb_id}", ) select_btn = gr.Button( "Selected" if is_selected else "Select", variant="primary" if is_selected else "secondary", scale=1, min_width=90, size="sm", ) rename_btn = gr.Button("Rename", variant="secondary", scale=1, min_width=80, size="sm") delete_btn = gr.Button("Delete", variant="stop", scale=1, min_width=80, size="sm") def on_select(nb_id=nb_id): return nb_id def on_select_status(name=name): return f"Selected notebook: {name}" select_btn.click( on_select, inputs=None, outputs=[selected_notebook_id], api_name=False, ).then(on_select_status, inputs=None, outputs=[notebook_status], api_name=False) rename_btn.click( _safe_rename, inputs=[gr.State(idx), name_txt, nb_state, selected_notebook_id], outputs=[nb_state, selected_notebook_id, notebook_status], api_name=False, ) delete_btn.click( _safe_delete, inputs=[gr.State(idx), nb_state, selected_notebook_id], outputs=[nb_state, selected_notebook_id, notebook_status], api_name=False, ) with gr.Group(elem_classes=["section-card", "sources-card"]): gr.Markdown("**Upload Sources**", elem_classes=["section-title"]) gr.Markdown("*Add PDF, URL, and text content into the selected notebook.*") source_status = gr.Markdown("", elem_classes=["status"]) with gr.Row(elem_classes=["section-row"]): pdf_upload_btn = gr.UploadButton( "Upload PDFs", file_types=[".pdf"], file_count="multiple", type="filepath", variant="secondary", ) with gr.Row(elem_classes=["section-row"]): uploaded_pdf_dd = gr.Dropdown( label="Uploaded PDFs", choices=[], value=None, scale=3, allow_custom_value=False, ) remove_pdf_btn = gr.Button("Remove selected PDF", variant="stop", scale=1) with gr.Row(elem_classes=["section-row"]): url_txt = gr.Textbox( label="Ingest web URL", placeholder="https://example.com", value="", scale=3, ) ingest_url_btn = gr.Button("Ingest URL", variant="primary", scale=1) remove_url_btn = gr.Button("Delete URL", variant="stop", scale=1) gr.Markdown("**Text Source**", elem_classes=["section-title"]) with gr.Row(elem_classes=["section-row"]): txt_title = gr.Textbox( label="Title", placeholder="Give this text a name (e.g. 'Lecture Notes Week 1')", scale=1, ) txt_input = gr.Textbox( label="Text Content", placeholder="Paste or type your text here...", lines=10, ) submit_btn = gr.Button("Save & Process", variant="primary") upload_status = gr.Markdown("", elem_classes=["status"]) sources_display = gr.Markdown("No sources yet.") with gr.Group(elem_classes=["section-card", "chat-card"]): gr.Markdown("**Chat**", elem_classes=["section-title"]) gr.Markdown("*Ask questions about your notebook sources. Answers are grounded in retrieved chunks with citations.*") chatbot = gr.Chatbot(label="Chat history", height=400) chat_input = gr.Textbox( label="Message", placeholder="Ask a question about your sources...", show_label=False, lines=2, ) chat_submit_btn = gr.Button("Send", variant="primary") chat_status = gr.Markdown("", elem_classes=["status"]) citations_display = gr.Accordion("📎 Sources cited (from last response)", open=True, visible=False) with citations_display: citations_md = gr.Markdown("", elem_classes=["status"]) with gr.Group(elem_classes=["section-card", "artifacts-card"]): gr.Markdown("**Artifacts**", elem_classes=["section-title"]) gr.Markdown("**Report**") with gr.Row(elem_classes=["section-row"]): report_scope_dd = gr.Dropdown( label="Report scope", choices=list(REPORT_SCOPE_LABELS.keys()), value=DEFAULT_REPORT_SCOPE_LABEL, scale=3, ) report_btn = gr.Button("Generate report", variant="primary", scale=1) report_status = gr.Markdown("Select a scope and click generate.", elem_classes=["status"]) report_output = gr.Markdown("", elem_id="report-output") gr.Markdown("**Podcast**") with gr.Row(elem_classes=["section-row"]): podcast_btn = gr.Button("Generate Podcast", variant="primary") podcast_audio_btn = gr.Button("Generate Podcast Audio", variant="secondary") podcast_status = gr.Markdown("", elem_classes=["status"]) podcast_script = gr.Markdown("") podcast_audio = gr.Audio(label="Podcast Audio", type="filepath") gr.Markdown("**Quiz**") gr.Markdown("Select a source type then generate a quiz.") quiz_source_type = gr.Radio( choices=["Text", "PDF", "URL", "All"], value="All", label="Source type", ) quiz_pdf_dd = gr.Dropdown( label="Select PDF", choices=[], value=None, visible=False, ) generate_quiz_btn = gr.Button("Generate Quiz", variant="primary") quiz_status = gr.Markdown("") quiz_components = [] for i in range(5): with gr.Group(visible=False) as q_group: q_text = gr.Markdown("") q_radio = gr.Radio(choices=[], label="Your answer", visible=False) q_textbox = gr.Textbox(label="Your answer", visible=False) quiz_components.append({"group": q_group, "text": q_text, "radio": q_radio, "textbox": q_textbox}) submit_quiz_btn = gr.Button("Submit Answers", variant="secondary", visible=False) quiz_results = gr.Markdown("") demo.load( _initial_load, inputs=None, outputs=[nb_state, selected_notebook_id, notebook_status, auth_text, auth_info_row, app_content, login_container, source_status, user_id_state], api_name=False, ) demo.load(_list_uploaded_pdfs, inputs=[selected_notebook_id], outputs=[uploaded_pdf_dd], api_name=False) demo.load(_load_sources, inputs=[selected_notebook_id], outputs=[sources_display], api_name=False) demo.load(_selected_notebook_text, inputs=[selected_notebook_id, nb_state], outputs=[selected_notebook_md], api_name=False) def _on_notebook_select_for_chat(notebook_id, user_id): hist, _ = _load_chat_history(notebook_id, user_id) return hist, hist, "", gr.update(visible=False) selected_notebook_id.change( _on_notebook_select_for_chat, inputs=[selected_notebook_id, user_id_state], outputs=[chat_history_state, chatbot, citations_md, citations_display], api_name=False, ) selected_notebook_id.change(_list_uploaded_pdfs, inputs=[selected_notebook_id], outputs=[uploaded_pdf_dd], api_name=False) selected_notebook_id.change(_load_sources, inputs=[selected_notebook_id], outputs=[sources_display], api_name=False) selected_notebook_id.change(_selected_notebook_text, inputs=[selected_notebook_id, nb_state], outputs=[selected_notebook_md], api_name=False) nb_state.change(_selected_notebook_text, inputs=[selected_notebook_id, nb_state], outputs=[selected_notebook_md], api_name=False) create_btn.click( _safe_create, inputs=[create_txt, nb_state, selected_notebook_id], outputs=[create_txt, nb_state, selected_notebook_id, notebook_status], api_name=False, ).then(_list_uploaded_pdfs, inputs=[selected_notebook_id], outputs=[uploaded_pdf_dd]) pdf_upload_btn.upload( _safe_upload_pdfs, inputs=[pdf_upload_btn, selected_notebook_id], outputs=[source_status], api_name=False, ).then(_list_uploaded_pdfs, inputs=[selected_notebook_id], outputs=[uploaded_pdf_dd]) ingest_url_btn.click( _safe_ingest_url, inputs=[url_txt, selected_notebook_id], outputs=[url_txt, source_status], api_name=False, ) remove_url_btn.click( _safe_remove_url, inputs=[url_txt, selected_notebook_id], outputs=[url_txt, source_status], api_name=False ) remove_pdf_btn.click( _safe_remove_pdf, inputs=[uploaded_pdf_dd, selected_notebook_id], outputs=[source_status], api_name=False, ).then(_list_uploaded_pdfs, inputs=[selected_notebook_id], outputs=[uploaded_pdf_dd]) submit_btn.click( _do_upload, inputs=[txt_input, txt_title, selected_notebook_id], outputs=[upload_status], ).then(_load_sources, inputs=[selected_notebook_id], outputs=[sources_display]) report_btn.click( _generate_report, inputs=[report_scope_dd, selected_notebook_id], outputs=[report_status, report_output], api_name=False, ) podcast_btn.click( _safe_generate_podcast, inputs=[selected_notebook_id], outputs=[podcast_status, podcast_script], api_name=False, ) podcast_audio_btn.click( _safe_generate_podcast_audio, inputs=[selected_notebook_id, podcast_script], outputs=[podcast_status, podcast_audio], api_name=False, ) quiz_source_type.change( _quiz_pdf_dropdown_update, inputs=[quiz_source_type, selected_notebook_id], outputs=[quiz_pdf_dd], api_name=False, ).then( _generate_btn_update, inputs=[quiz_source_type, quiz_pdf_dd], outputs=[generate_quiz_btn], api_name=False, ) quiz_pdf_dd.change( _generate_btn_update, inputs=[quiz_source_type, quiz_pdf_dd], outputs=[generate_quiz_btn], api_name=False, ) quiz_all_outputs = [quiz_status, quiz_state] for c in quiz_components: quiz_all_outputs += [c["group"], c["text"], c["radio"], c["textbox"]] quiz_all_outputs += [submit_quiz_btn, quiz_results] generate_quiz_btn.click( lambda: gr.update(value="Generating quiz..."), inputs=[], outputs=[quiz_status], api_name=False, ).then( _generate_quiz, inputs=[selected_notebook_id, quiz_source_type, quiz_pdf_dd], outputs=quiz_all_outputs, api_name=False, ) submit_quiz_btn.click( _submit_quiz, inputs=[quiz_state] + [c["radio"] for c in quiz_components] + [c["textbox"] for c in quiz_components], outputs=[quiz_results], api_name=False, ) chat_submit_btn.click( _on_chat_submit, inputs=[chat_input, selected_notebook_id, chat_history_state], outputs=[chat_input, chat_history_state, chat_status, citations_md, citations_display], api_name=False, ).then( lambda h: (h, h), inputs=[chat_history_state], outputs=[chat_history_state, chatbot], ) if __name__ == "__main__": _log("5. Launching Gradio...") demo.launch()