from pathlib import Path import shutil import sys import warnings # Flush print immediately def _log(msg): print(msg, flush=True) _log("1. Loading env...") # Suppress noisy dependency warnings warnings.filterwarnings("ignore", message=".*urllib3.*") warnings.filterwarnings("ignore", message=".*chardet.*") from dotenv import load_dotenv # Load .env from project root (parent of NotebookLM-Clone) so HF_TOKEN etc. are available load_dotenv(Path(__file__).resolve().parent.parent / ".env") load_dotenv(Path(__file__).resolve().parent / ".env") _log("2. Loading Gradio...") from datetime import datetime import gradio as gr _log("2a. Loading gradio_client...") import gradio_client.utils as gradio_client_utils _log("3. Loading backend...") from backend.ingestion_service import ingest_pdf_chunks, ingest_url_chunks, remove_chunks_for_source from backend.notebook_service import create_notebook, list_notebooks, rename_notebook, delete_notebook from backend.podcast_service import generate_podcast, generate_podcast_audio from backend.chat_service import load_chat from backend.rag_service import rag_chat from backend.report_service import generate_report import hashlib _log("4. Imports done.") _original_gradio_get_type = gradio_client_utils.get_type _original_json_schema_to_python_type = gradio_client_utils._json_schema_to_python_type def _patched_gradio_get_type(schema): if isinstance(schema, bool): return "Any" return _original_gradio_get_type(schema) def _patched_json_schema_to_python_type(schema, defs=None): if isinstance(schema, bool): return "Any" return _original_json_schema_to_python_type(schema, defs) gradio_client_utils.get_type = _patched_gradio_get_type gradio_client_utils._json_schema_to_python_type = _patched_json_schema_to_python_type # Theme: adapts to light/dark mode (use default font to avoid network fetch on startup) theme = gr.themes.Soft( primary_hue="blue", secondary_hue="slate", ) CUSTOM_CSS = """ .gradio-container { max-width: 1000px !important; margin: 0 auto !important; } .container { max-width: 1000px; margin: 0 auto; padding: 0 24px; } .header-bar { padding: 12px 0; border-bottom: 1px solid #e2e8f0; margin-bottom: 24px; display: flex !important; justify-content: space-between !important; align-items: center !important; white-space: nowrap; } .login-center { display: flex; justify-content: center; width: 100%; } #auth-text { white-space: nowrap; margin: 8px 0 16px 0; font-size: 0.95rem; opacity: 0.9; } .gr-button { padding: 14px 28px !important; font-size: 0.9rem !important; border-radius: 12px !important; white-space: nowrap !important; width: auto !important; } .gr-button[aria-label*="Logout"] { min-width: auto !important; display: inline-flex !important; align-items: center !important; justify-content: center !important; } .header-bar .gr-button { padding-left: 28px !important; padding-right: 28px !important; min-width: 220px !important; font-size: 0.9rem !important; } #login-btn, #login-btn.gr-button, #login-btn button, #login-btn .gr-button { display: inline-flex !important; flex-direction: row !important; align-items: center !important; justify-content: center !important; gap: 8px !important; width: auto !important; max-width: 100% !important; min-width: 220px !important; overflow: hidden !important; } #login-btn p, #login-btn span, #login-btn .md, #login-btn .md p { margin: 0 !important; font-size: 0.95rem !important; line-height: 1.2 !important; white-space: nowrap !important; overflow: hidden !important; text-overflow: ellipsis !important; } .dark .header-bar { border-bottom: 1px solid #334155; } .hero-section { margin-bottom: 16px; } .login-container { padding: 12px 0; } .create-strip { padding: 18px; border-radius: 16px; } .create-row { display: flex !important; align-items: center !important; gap: 16px !important; } .create-label { white-space: nowrap; font-size: 0.95rem; margin: 0; min-width: 180px; } .create-row .gr-textbox { flex: 1 !important; } .create-row .gr-textbox textarea, .create-row .gr-textbox input { border-radius: 10px !important; } .create-row .gr-button { border-radius: 10px !important; padding: 10px 20px !important; } .hero-title { font-size: 2rem; font-weight: 700; color: #1e293b; margin: 0 0 8px 0; } .hero-sub { font-size: 1rem; color: #64748b; margin: 0; line-height: 1.5; } .section-card { padding: 24px; border-radius: 16px; background: #f8fafc; margin: 24px; box-shadow: 0 2px 8px rgba(0,0,0,0.06); } .notebook-card { padding: 14px 20px; border-radius: 12px; background: #fff; margin: 8px 0 !important; border: 1px solid #e2e8f0; display: flex; align-items: center; gap: 12px; transition: background 0.15s ease; width: 100% !important; box-sizing: border-box !important; overflow: hidden; } .notebook-card:hover { background: #f8fafc; } .notebook-selected { border: 2px solid #3b82f6 !important; box-shadow: none !important; } .manager-card { border-top: 4px solid #3b82f6; } .sources-card { border-top: 4px solid #14b8a6; } .chat-card { border-top: 4px solid #8b5cf6; } .artifacts-card { border-top: 4px solid #f97316; } .section-title { font-size: 1.125rem; font-weight: 600; color: #1e293b; margin: 0 0 16px 0; } .section-row { display: flex !important; align-items: center !important; gap: 16px !important; margin-bottom: 12px; } .section-row .gr-textbox { flex: 1 !important; } .section-row .gr-button { border-radius: 10px !important; padding: 10px 20px !important; } .status { font-size: 0.875rem; color: #64748b; margin: 16px; padding: 12px 16px; background: #f1f5f9; border-radius: 12px; } @media (prefers-color-scheme: dark) { .hero-title { color: #f1f5f9 !important; } .hero-sub { color: #94a3b8 !important; } .section-card { background: #1e293b !important; box-shadow: 0 2px 8px rgba(0,0,0,0.3); } .section-title { color: #f1f5f9 !important; } .notebook-card { background: #334155 !important; border-color: #475569; } .notebook-card:hover { background: #475569 !important; } .notebook-selected { border: 2px solid #60a5fa !important; box-shadow: none !important; } .manager-card { border-top-color: #60a5fa; } .sources-card { border-top-color: #2dd4bf; } .chat-card { border-top-color: #a78bfa; } .artifacts-card { border-top-color: #fb923c; } .status { color: #94a3b8 !important; background: #334155 !important; } } .dark .hero-title { color: #f1f5f9 !important; } .dark .hero-sub { color: #94a3b8 !important; } .dark .section-card { background: #1e293b !important; } .dark .section-title { color: #f1f5f9 !important; } .dark .notebook-card { background: #334155 !important; border-color: #475569; } .dark .notebook-card:hover { background: #475569 !important; } .dark .status { color: #94a3b8 !important; background: #334155 !important; } #delete-btn { border-radius: 16px; background: #FF0000; } #rename-btn { border-radius: 16px; background: #008000; } #select-btn { border-radius: 16px; } #ingest-url-btn { border-radius: 16px; } """ def _user_id(profile: gr.OAuthProfile | None) -> str | None: """Extract user_id from HF OAuth profile. None if not logged in.""" if not profile: return None return ( getattr(profile, "id", None) or getattr(profile, "sub", None) or getattr(profile, "preferred_username", None) or getattr(profile, "username", None) or getattr(profile, "name", None) ) def _get_notebooks(user_id: str | None): if not user_id: return [] return list_notebooks(user_id) def _safe_create(new_name, state, selected_id, profile: gr.OAuthProfile | None = None): """Create notebook with name from text box.""" try: user_id = _user_id(profile) if not user_id: return gr.skip(), gr.skip(), gr.skip(), "Please sign in with Hugging Face" name = (new_name or "").strip() or "Untitled Notebook" nb = create_notebook(user_id, name) if nb: notebooks = _get_notebooks(user_id) new_state = [(n["notebook_id"], n["name"]) for n in notebooks] status = f"Created: {nb['name']}" return "", new_state, nb["notebook_id"], status return gr.skip(), gr.skip(), gr.skip(), "Failed to create" except Exception as e: return gr.skip(), gr.skip(), gr.skip(), f"Error: {e}" def _safe_rename(idx, new_name, state, selected_id, profile: gr.OAuthProfile | None = None): """Rename notebook at index.""" try: if idx is None or idx < 0 or idx >= len(state): return gr.skip(), gr.skip(), "Invalid selection" nb_id, _ = state[idx] name = (new_name or "").strip() if not name: return gr.skip(), gr.skip(), "Enter a name." user_id = _user_id(profile) if not user_id: return gr.skip(), gr.skip(), "Please sign in" ok = rename_notebook(user_id, nb_id, name) if ok: notebooks = _get_notebooks(user_id) new_state = [(n["notebook_id"], n["name"]) for n in notebooks] return new_state, selected_id, f"Renamed to: {name}" return gr.skip(), gr.skip(), "Failed to rename" except Exception as e: return gr.skip(), gr.skip(), f"Error: {e}" def _safe_delete(idx, state, selected_id, profile: gr.OAuthProfile | None = None): """Delete notebook at index.""" try: if idx is None or idx < 0 or idx >= len(state): return gr.skip(), gr.skip(), "Invalid selection" nb_id, _ = state[idx] user_id = _user_id(profile) if not user_id: return gr.skip(), gr.skip(), "Please sign in" ok = delete_notebook(user_id, nb_id) if ok: notebooks = _get_notebooks(user_id) new_state = [(n["notebook_id"], n["name"]) for n in notebooks] new_selected = notebooks[0]["notebook_id"] if notebooks else None return new_state, new_selected, "Notebook deleted" return gr.skip(), gr.skip(), "Failed to delete" except Exception as e: return gr.skip(), gr.skip(), f"Error: {e}" def _initial_load(profile: gr.OAuthProfile | None = None): """Load notebooks on app load. Uses HF OAuth profile for user_id.""" user_id = _user_id(profile) notebooks = _get_notebooks(user_id) state = [(n["notebook_id"], n["name"]) for n in notebooks] selected = notebooks[0]["notebook_id"] if notebooks else None status = f"Signed in as {user_id}" if user_id else "Sign in with Hugging Face to manage notebooks." auth_update = f"You are logged in as {getattr(profile, 'name', None) or user_id} ({_user_id(profile)})" if user_id else "" auth_row_visible = bool(user_id) source_status = "" if user_id else "Sign in with Hugging Face to upload context material." notebook_status_update = gr.update( value="Sign in with Hugging Face to manage notebooks." if not user_id else "", visible=not bool(user_id), ) return ( state, selected, notebook_status_update, auth_update, gr.update(visible=auth_row_visible), gr.update(visible=bool(user_id)), gr.update(visible=not bool(user_id)), source_status, user_id, ) def _selected_notebook_text(selected_id, state) -> str: if not selected_id: return "**Selected notebook:** None" name_map = {str(notebook_id): name for notebook_id, name in (state or [])} name = name_map.get(str(selected_id)) if name: return f"**Selected notebook:** {name}" return "**Selected notebook:** Unknown" REPORT_SCOPE_LABELS = { "All sources (PDFs, URLs, text)": "all", "PDF uploads only": "pdf", "Web URLs only": "url", "Uploaded text only": "text", } REPORT_SCOPE_DESCRIPTIONS = { "all": "PDFs, URLs, and uploaded text", "pdf": "uploaded PDFs", "url": "ingested web URLs", "text": "uploaded text files", } DEFAULT_REPORT_SCOPE_LABEL = "All sources (PDFs, URLs, text)" def _resolve_report_scope(label: str) -> tuple[str, str]: value = REPORT_SCOPE_LABELS.get(label, "all") desc = REPORT_SCOPE_DESCRIPTIONS.get(value, "selected sources") return value, desc def _generate_report(scope_label, notebook_id, profile: gr.OAuthProfile | None): scope_value, scope_desc = _resolve_report_scope(scope_label) user_id = _user_id(profile) if not user_id: return "Please sign in with Hugging Face before generating a report.", "" if not notebook_id: return "Select a notebook first to generate a report.", "" try: report_text = generate_report(notebook_id, scope_value) status = f"Report ready for {scope_desc}." return status, report_text except ValueError as error: return f"⚠️ {error}", "" except Exception as error: return f"Error generating report: {error}", "" def _safe_upload_pdfs(files, selected_id, profile: gr.OAuthProfile | None): """Upload PDF files for the selected notebook.""" try: user_id = _user_id(profile) if not user_id: return "Please sign in with Hugging Face before uploading PDFs." if not selected_id: return "Select a notebook first, then upload PDFs." if not files: return "Choose at least one PDF to upload." if isinstance(files, str): file_paths = [files] else: file_paths = [] for file_item in files: file_path = getattr(file_item, "name", file_item) if file_path: file_paths.append(file_path) if not file_paths: return "No files were received. Try uploading again." target_dir = Path("data") / "uploads" / user_id / str(selected_id) target_dir.mkdir(parents=True, exist_ok=True) uploaded = [] total_chunks = 0 for file_path in file_paths: source_path = Path(file_path) if source_path.suffix.lower() != ".pdf": continue destination = target_dir / source_path.name if destination.exists(): index = 1 while True: candidate = target_dir / f"{source_path.stem}_{index}{source_path.suffix}" if not candidate.exists(): destination = candidate break index += 1 shutil.copy2(source_path, destination) uploaded.append(destination.name) total_chunks += ingest_pdf_chunks(str(selected_id), destination.name, destination) if not uploaded: return "Only .pdf files are allowed." return f"Uploaded {len(uploaded)} PDF(s): {', '.join(uploaded)}. Indexed {total_chunks} chunk(s) for RAG." except Exception as error: return f"Error uploading PDFs: {error}" def _list_uploaded_pdfs(selected_id, profile: gr.OAuthProfile | None = None): """List uploaded PDFs for the selected notebook.""" user_id = _user_id(profile) if not user_id or not selected_id: return gr.update(choices=[], value=None) target_dir = Path("data") / "uploads" / user_id / str(selected_id) if not target_dir.exists(): return gr.update(choices=[], value=None) pdf_names = sorted([path.name for path in target_dir.glob("*.pdf")]) selected_name = pdf_names[0] if pdf_names else None return gr.update(choices=pdf_names, value=selected_name) def _safe_remove_pdf(file_name, selected_id, profile: gr.OAuthProfile | None = None): """Remove one uploaded PDF from the selected notebook.""" try: user_id = _user_id(profile) if not user_id: return "Please sign in with Hugging Face before removing PDFs." if not selected_id: return "Select a notebook first." if not file_name: return "Select a PDF to remove." safe_name = Path(file_name).name target_file = Path("data") / "uploads" / user_id / str(selected_id) / safe_name if not target_file.exists() or target_file.suffix.lower() != ".pdf": return "Selected PDF was not found." target_file.unlink() remove_chunks_for_source(str(selected_id), safe_name) return f"Removed PDF: {safe_name}" except Exception as error: return f"Error removing PDF: {error}" def _url_source_id(url: str) -> str: """Stable source_id so re-ingesting the same URL overwrites old chunks.""" h = hashlib.sha256(url.encode("utf-8", errors="ignore")).hexdigest()[:16] return f"url_{h}" def _safe_ingest_url(url, selected_id, profile: gr.OAuthProfile | None = None): """Ingest one URL into chunks table for the selected notebook.""" try: user_id = _user_id(profile) if not user_id: return "", "Please sign in with Hugging Face before ingesting a URL." if not selected_id: return "", "Select a notebook first, then ingest a URL." cleaned = (url or "").strip() if not cleaned: return "", "Enter a URL." if not (cleaned.startswith("http://") or cleaned.startswith("https://")): return "", "URL must start with http:// or https://" source_id = _url_source_id(cleaned) chunk_count = ingest_url_chunks(str(selected_id), source_id, cleaned) if chunk_count == 0: return "", ( "Ingested URL but extracted 0 chunks. Page may be JS-rendered/blocked/non-text. " "Try a simpler static page (example.com / Wikipedia)." ) return "", f"Ingested URL. Indexed {chunk_count} chunk(s). Source: {cleaned}" except Exception as error: return "", f"Error ingesting URL: {error}" def _safe_remove_url(url, selected_id, profile: gr.OAuthProfile | None = None): try: user_id = _user_id(profile) if not user_id: return "", "Please sign in with Hugging Face before ingesting a URL." if not selected_id: return "", "Select a notebook first, then remove a URL." cleaned = (url or "").strip() if not cleaned: return "", "Enter a URL." if not (cleaned.startswith("http://") or cleaned.startswith("https://")): return "", "URL must start with http:// or https://" source_id = _url_source_id(cleaned) remove_chunks_for_source(str(selected_id), source_id) return "", f"Removed URL: {cleaned}" except Exception as error: return "", f"Error removing URL: {error}" # ── Upload Handler Functions ────────────────────────────────── def _do_upload(text_content, title, notebook_id, profile: gr.OAuthProfile | None): """Handle direct text input and ingestion.""" from backend.ingestion_txt import ingest_txt user_id = _user_id(profile) if not user_id: return "Please sign in first." if not notebook_id: return "Please select a notebook first." if not text_content or not text_content.strip(): return "No text entered." try: filename = (title or "").strip() if not filename: filename = f"text_{datetime.now().strftime('%Y%m%d_%H%M%S')}" if not filename.endswith(".txt"): filename = filename + ".txt" file_bytes = text_content.encode("utf-8") result = ingest_txt( file_bytes=file_bytes, filename=filename, notebook_id=notebook_id, user_id=user_id ) meta = result["metadata"] return ( f" **{result['filename']}** saved successfully!\n\n" f"- Size: {meta['size_bytes'] / 1024:.1f} KB" ) except ValueError as e: return f" {str(e)}" except Exception as e: return f"Unexpected error: {str(e)}" def _format_sources(sources: list[dict]) -> str: if not sources: return "No sources yet." lines = ["| Filename | Type | Status | Words |", "|----------|------|--------|-------|"] for s in sources: meta = s.get("metadata") or {} words = meta.get("word_count", "—") lines.append(f"| {s['filename']} | {s['file_type']} | {s['status']} | {words} |") return "\n".join(lines) def _load_sources(notebook_id, profile: gr.OAuthProfile | None): from backend.ingestion_txt import list_sources if not notebook_id: return "" sources = list_sources(notebook_id) return _format_sources(sources) def _safe_generate_podcast(notebook_id, profile: gr.OAuthProfile | None = None): user_id = _user_id(profile) if not user_id: return "Please sign in first.", "" if not notebook_id: return "Please select a notebook first.", "" try: result = generate_podcast(notebook_id=str(notebook_id), user_id=user_id) status = ( f"Podcast generated. Artifact: {result['artifact_id'] or 'saved'} | " f"Sources: {result['sources_count']} | Chunks: {result['chunks_used']}" ) return status, result["script"] except Exception as error: return f"Error generating podcast: {error}", "" def _safe_generate_podcast_audio(notebook_id, script, profile: gr.OAuthProfile | None = None): user_id = _user_id(profile) if not user_id: return "Please sign in first.", None if not notebook_id: return "Please select a notebook first.", None if not script or not script.strip(): return "Generate a podcast script first.", None try: result = generate_podcast_audio(notebook_id=str(notebook_id), user_id=user_id, script=script) status = f"Podcast audio generated. Artifact: {result['artifact_id'] or 'saved'}" return status, result["audio_path"] except Exception as error: return f"Error generating podcast audio: {error}", None # Quiz Handlers def _get_notebook_pdfs(notebook_id, profile: gr.OAuthProfile | None): user_id = _user_id(profile) if not user_id or not notebook_id: return gr.update(choices=[], value=None, visible=False) target_dir = Path("data") / "uploads" / user_id / str(notebook_id) if not target_dir.exists(): return gr.update(choices=[], value=None, visible=False) pdfs = sorted([p.name for p in target_dir.glob("*.pdf")]) return gr.update(choices=pdfs, value=pdfs[0] if pdfs else None, visible=True) def _generate_quiz(notebook_id, source_type, pdf_source_id, profile: gr.OAuthProfile | None): from backend.quiz_service import generate_quiz user_id = _user_id(profile) if not user_id: return "Please sign in first.", [], *([gr.update(visible=False)] * 5 * 4), gr.update(visible=False), "" if not notebook_id: return "Please select a notebook first.", [], *([gr.update(visible=False)] * 5 * 4), gr.update(visible=False), "" type_map = {"Text": "txt", "PDF": "pdf", "URL": "url", "All": "all"} source_type_key = type_map.get(source_type, "all") if source_type_key == "pdf" and not pdf_source_id: return "Pick a PDF first.", [], *([gr.update(visible=False)] * 5 * 4), gr.update(visible=False), "" try: result = generate_quiz(notebook_id, source_type=source_type_key, source_id=pdf_source_id) questions = result["questions"] updates = [] for i in range(5): if i < len(questions): q = questions[i] q_label = f"**Q{i+1}. {q['question']}**" if q["type"] == "multiple_choice": updates += [gr.update(visible=True), gr.update(value=q_label), gr.update(choices=q["options"], value=None, visible=True), gr.update(value="", visible=False)] elif q["type"] == "true_false": updates += [gr.update(visible=True), gr.update(value=q_label), gr.update(choices=["True", "False"], value=None, visible=True), gr.update(value="", visible=False)] else: # change this line for short_answer: updates += [gr.update(visible=True), gr.update(value=q_label), gr.update(choices=[], value=None, visible=False), gr.update(value="", visible=True)] else: updates += [gr.update(visible=False), gr.update(value=""), gr.update(choices=[], value=None, visible=False), gr.update(value="", visible=False)] return "Quiz generated!", questions, *updates, gr.update(visible=True), "" except Exception as e: return f" {e}", [], *([gr.update(visible=False)] * 5 * 4), gr.update(visible=False), "" def _submit_quiz(questions, *answers): if not questions: return " No quiz loaded." score = 0 lines = [] for i, q in enumerate(questions): radio_ans = answers[i] or "" text_ans = answers[i + 5] or "" user_ans = text_ans.strip() if q["type"] == "short_answer" else radio_ans.strip() correct = q["answer"].strip() if not user_ans: is_correct = False elif q["type"] == "multiple_choice": user_letter = user_ans.split(".")[0].strip().upper() correct_letter = correct[0].upper() is_correct = user_letter == correct_letter elif q["type"] == "true_false": is_correct = user_ans.lower() == correct.lower() else: is_correct = user_ans.lower() in correct.lower() or correct.lower() in user_ans.lower() if is_correct: score += 1 lines.append(f"✅ **Q{i+1}**: Correct! *(Answer: {correct})*") else: lines.append(f"❌ **Q{i+1}**: Incorrect. *(Your answer: {user_ans or 'blank'} | Correct: {correct})*") lines.append(f"\n**Score: {score}/{len(questions)}**") return "\n\n".join(lines) def _chat_history_to_pairs(messages: list[dict]) -> list[tuple[str, str]]: """Convert load_chat output to Gradio Chatbot format [(user, assistant), ...].""" pairs = [] i = 0 while i < len(messages): m = messages[i] if m["role"] == "user": user_content = m["content"] or "" asst_content = "" if i + 1 < len(messages) and messages[i + 1]["role"] == "assistant": asst_content = messages[i + 1]["content"] or "" i += 1 pairs.append((user_content, asst_content)) i += 1 return pairs def _load_chat_history(notebook_id, user_id: str | None) -> tuple[list[tuple[str, str]], list[tuple[str, str]]]: """Load chat for notebook. Returns (history_pairs, history_pairs) for State and Chatbot. Only loads if notebook belongs to user (ownership validation). """ if not notebook_id: return [], [] messages = load_chat(notebook_id, user_id) pairs = _chat_history_to_pairs(messages) return pairs, pairs def _format_citations(chunks: list[dict]) -> str: """Format retrieved chunks for citation display.""" if not chunks: return "" lines = ["**Sources cited:**", ""] for i, c in enumerate(chunks, 1): meta = c.get("metadata") or {} source_label = meta.get("url") or meta.get("file_name") or meta.get("file_path") or "Source" content = (c.get("content") or "")[:300] if len(c.get("content") or "") > 300: content += "..." content = content.replace("\n", " ") lines.append(f"**[{i}]** *{source_label}*") lines.append(f"> {content}") lines.append("") return "\n".join(lines) def _on_chat_submit(query, notebook_id, chat_history, profile: gr.OAuthProfile | None): """Handle chat submit: call RAG, return updated history and citations.""" if not notebook_id: return "", chat_history, "Select a notebook first.", "", gr.update(visible=False) if not query or not query.strip(): return "", chat_history, "Enter a message.", "", gr.update(visible=False) user_id = _user_id(profile) if not user_id: return "", chat_history, "Please sign in first.", "", gr.update(visible=False) try: answer, updated, chunks = rag_chat(notebook_id, query.strip(), chat_history, user_id=user_id) citations_md = _format_citations(chunks) accordion_update = gr.update(visible=True) if chunks else gr.update(visible=False) return "", updated, "", citations_md, accordion_update except Exception as e: return "", chat_history, f"Error: {e}", "", gr.update(visible=False) def _get_quiz_pdfs(source_type, notebook_id): if source_type != "PDF": return gr.update(visible=False, choices=[], value=None) if not notebook_id: return gr.update(visible=False, choices=[], value=None) # Search across all users for this notebook_id base = Path("data") / "uploads" pdfs = [] if base.exists(): for user_dir in base.iterdir(): nb_dir = user_dir / str(notebook_id) if nb_dir.exists(): pdfs = sorted([p.name for p in nb_dir.glob("*.pdf")]) break print(f"DEBUG quiz pdfs found: {pdfs}") return gr.update(visible=True, choices=pdfs, value=pdfs[0] if pdfs else None) def _quiz_pdf_dropdown_update(source_type, notebook_id, profile: gr.OAuthProfile | None): if source_type != "PDF": return gr.update(visible=False, choices=[], value=None) if not notebook_id: return gr.update(visible=True, choices=[], value=None) user_id = _user_id(profile) # Try with user_id first (production) if user_id: target_dir = Path("data") / "uploads" / user_id / str(notebook_id) if target_dir.exists(): pdfs = sorted([p.name for p in target_dir.glob("*.pdf")]) return gr.update(visible=True, choices=pdfs, value=pdfs[0] if pdfs else None) # Fallback for local dev (no OAuth): scan all user folders base = Path("data") / "uploads" if base.exists(): for user_dir in base.iterdir(): if not user_dir.is_dir(): continue nb_dir = user_dir / str(notebook_id) if nb_dir.exists(): pdfs = sorted([p.name for p in nb_dir.glob("*.pdf")]) print(f"DEBUG (local fallback): notebook_id={notebook_id}, pdfs={pdfs}") return gr.update(visible=True, choices=pdfs, value=pdfs[0] if pdfs else None) return gr.update(visible=True, choices=[], value=None) def _generate_btn_update(source_type, pdf_name): if source_type == "PDF": return gr.update(interactive=bool(pdf_name)) return gr.update(interactive=True) with gr.Blocks( title="NotebookLM Clone - Notebooks", theme=theme, css=CUSTOM_CSS, ) as demo: with gr.Row(elem_classes=["header-bar"]): gr.Markdown("### 📓 NotebookLM Clone") login_btn = gr.LoginButton(value="Login with Hugging Face", size="lg", elem_id="login-btn") with gr.Row(visible=False) as auth_info_row: auth_text = gr.Markdown("", elem_id="auth-text") gr.HTML("""
Chat with your documents. Generate reports, quizzes, and podcasts.