NotebookLM / pages /sources.py
internomega-terrablue
you tube api fix 2
23e38ab
"""Sources tab: file upload, URL input, and source list management."""
import uuid
from datetime import datetime
from state import UserData, Source, get_active_notebook
ALLOWED_TYPES = ["pdf", "pptx", "txt"]
MAX_FILE_SIZE_MB = 15
MAX_SOURCES_PER_NOTEBOOK = 20
FILE_TYPE_CONFIG = {
"pdf": {"icon": "πŸ“•", "color": "239,68,68", "label": "PDF"},
"pptx": {"icon": "πŸ“Š", "color": "249,115,22", "label": "PPTX"},
"txt": {"icon": "πŸ“", "color": "59,130,246", "label": "TXT"},
"url": {"icon": "🌐", "color": "34,197,94", "label": "URL"},
"youtube": {"icon": "🎬", "color": "239,68,68", "label": "YouTube"},
}
def render_source_header(state: UserData) -> str:
nb = get_active_notebook(state)
if not nb:
return ""
total = len(nb.sources)
remaining = MAX_SOURCES_PER_NOTEBOOK - total
return (
f'<div style="display:flex; align-items:center; justify-content:space-between; margin-bottom:20px;">'
f'<div>'
f'<span style="font-size:1.1rem; font-weight:600; color:#e0e0f0;">Sources</span>'
f'<span style="margin-left:10px; padding:3px 10px; background:rgba(102,126,234,0.15); '
f'color:#8090d0; border-radius:12px; font-size:0.8rem; font-weight:600;">'
f'{total} / {MAX_SOURCES_PER_NOTEBOOK}</span>'
f'</div>'
f'<span style="font-size:0.8rem; color:#606078;">{remaining} slots remaining</span>'
f'</div>'
)
def render_source_list(state: UserData) -> str:
nb = get_active_notebook(state)
if not nb or not nb.sources:
return (
'<div style="text-align:center; padding:50px 20px; color:#606078;">'
'<div style="font-size:3rem; margin-bottom:16px;">πŸ“„</div>'
'<h3 style="color:#a0a0b8; font-weight:600;">No sources yet</h3>'
'<p style="font-size:0.9rem;">Upload documents or add web links above.<br>'
'Your sources power the AI chat and artifact generation.</p>'
'</div>'
)
html = f'<p style="font-weight:600; font-size:0.9rem; color:#a0a0b8; margin-bottom:12px;">Your Sources ({len(nb.sources)})</p>'
for source in nb.sources:
ft = source.file_type
cfg = FILE_TYPE_CONFIG.get(ft, {"icon": "πŸ“„", "color": "150,150,170", "label": ft.upper()})
meta_parts = [cfg["label"]]
if source.size_mb:
meta_parts.append(f"{source.size_mb} MB")
if source.chunk_count > 0:
meta_parts.append(f"{source.chunk_count} chunks")
meta_str = " Β· ".join(meta_parts)
if source.status == "processing":
badge = '<span class="source-badge processing">Processing...</span>'
elif source.status == "failed":
err = source.error_message or "Unknown error"
badge = f'<span class="source-badge failed" title="{err}">Failed</span>'
else:
badge = '<span class="source-badge ready">Ready</span>'
html += (
f'<div class="source-card">'
f'<div class="source-icon {ft}">{cfg["icon"]}</div>'
f'<div class="source-info">'
f'<div class="name">{source.filename}</div>'
f'<div class="meta">{meta_str}</div>'
f'</div>'
f'{badge}'
f'</div>'
)
return html
def get_source_choices(state: UserData) -> list[str]:
nb = get_active_notebook(state)
if not nb:
return []
return [s.filename for s in nb.sources if s.status == "ready"]
def handle_file_upload(files, state: UserData) -> tuple[UserData, str, str, list[str]]:
"""Handle file upload. Returns (state, source_list_html, header_html, source_choices)."""
nb = get_active_notebook(state)
if not nb or not files:
return state, render_source_list(state), render_source_header(state), get_source_choices(state)
for f in files:
filename = f.name if hasattr(f, 'name') else str(f).rsplit("/", 1)[-1]
# Extract just the filename from the path
filename = filename.rsplit("/", 1)[-1] if "/" in filename else filename
existing_names = [s.filename for s in nb.sources]
if filename in existing_names:
continue
if len(nb.sources) >= MAX_SOURCES_PER_NOTEBOOK:
break
file_ext = filename.rsplit(".", 1)[-1].lower() if "." in filename else ""
if file_ext not in ALLOWED_TYPES:
continue
# Get file size and path
try:
import os
file_path = f.name if hasattr(f, 'name') else str(f)
size_bytes = os.path.getsize(file_path)
size_mb = round(size_bytes / (1024 * 1024), 2)
except Exception:
file_path = None
size_mb = 0
if size_mb > MAX_FILE_SIZE_MB:
continue
source = Source(
id=str(uuid.uuid4()),
filename=filename,
file_type=file_ext,
size_mb=size_mb,
source_url=None,
chunk_count=0,
status="processing",
error_message=None,
created_at=datetime.now().isoformat(),
file_path=file_path,
)
nb.sources.append(source)
return state, render_source_list(state), render_source_header(state), get_source_choices(state)
def handle_url_add(url: str, state: UserData) -> tuple[UserData, str, str, str, list[str]]:
"""Handle adding a URL source. Returns (state, source_list_html, header_html, url_textbox_value, source_choices)."""
nb = get_active_notebook(state)
if not nb or not url or not url.strip():
return state, render_source_list(state), render_source_header(state), "", get_source_choices(state)
url = url.strip()
if len(nb.sources) >= MAX_SOURCES_PER_NOTEBOOK:
return state, render_source_list(state), render_source_header(state), "", get_source_choices(state)
existing_urls = [s.source_url for s in nb.sources if s.source_url]
if url in existing_urls:
return state, render_source_list(state), render_source_header(state), "", get_source_choices(state)
is_youtube = "youtube.com" in url or "youtu.be" in url
file_type = "youtube" if is_youtube else "url"
display_name = url[:55] + "..." if len(url) > 55 else url
source = Source(
id=str(uuid.uuid4()),
filename=display_name,
file_type=file_type,
size_mb=None,
source_url=url,
chunk_count=0,
status="processing",
error_message=None,
created_at=datetime.now().isoformat(),
file_path=None,
)
nb.sources.append(source)
return state, render_source_list(state), render_source_header(state), "", get_source_choices(state)
def handle_source_delete(source_name: str, state: UserData) -> tuple[UserData, str, str, list[str]]:
"""Delete a source by filename and remove its vectors from Pinecone."""
nb = get_active_notebook(state)
if not nb or not source_name:
return state, render_source_list(state), render_source_header(state), get_source_choices(state)
# Delete vectors from Pinecone before removing from state
source_to_delete = next((s for s in nb.sources if s.filename == source_name), None)
if source_to_delete:
try:
from persistence.vector_store import VectorStore
VectorStore().delete_by_source(source_to_delete.id, nb.id)
except Exception:
pass # Best-effort; source removed from UI regardless
nb.sources = [s for s in nb.sources if s.filename != source_name]
return state, render_source_list(state), render_source_header(state), get_source_choices(state)
def run_ingestion_pipeline(state: UserData) -> tuple[UserData, str, str, list[str]]:
"""Process all sources with status='processing' through the ingestion engine."""
from ingestion_engine import IngestionManager
nb = get_active_notebook(state)
if not nb:
return state, render_source_list(state), render_source_header(state), get_source_choices(state)
manager = IngestionManager()
for source in nb.sources:
if source.status != "processing":
continue
chunk_count, error = manager.process_source(source, source.file_path, nb.id)
if error:
source.status = "failed"
source.error_message = error
source.chunk_count = 0
else:
source.status = "ready"
source.error_message = None
source.chunk_count = chunk_count
return state, render_source_list(state), render_source_header(state), get_source_choices(state)