Spaces:
Sleeping
Sleeping
| """Sources tab: file upload, URL input, and source list management.""" | |
| import uuid | |
| from datetime import datetime | |
| from state import UserData, Source, get_active_notebook | |
| ALLOWED_TYPES = ["pdf", "pptx", "txt"] | |
| MAX_FILE_SIZE_MB = 15 | |
| MAX_SOURCES_PER_NOTEBOOK = 20 | |
| FILE_TYPE_CONFIG = { | |
| "pdf": {"icon": "π", "color": "239,68,68", "label": "PDF"}, | |
| "pptx": {"icon": "π", "color": "249,115,22", "label": "PPTX"}, | |
| "txt": {"icon": "π", "color": "59,130,246", "label": "TXT"}, | |
| "url": {"icon": "π", "color": "34,197,94", "label": "URL"}, | |
| "youtube": {"icon": "π¬", "color": "239,68,68", "label": "YouTube"}, | |
| } | |
| def render_source_header(state: UserData) -> str: | |
| nb = get_active_notebook(state) | |
| if not nb: | |
| return "" | |
| total = len(nb.sources) | |
| remaining = MAX_SOURCES_PER_NOTEBOOK - total | |
| return ( | |
| f'<div style="display:flex; align-items:center; justify-content:space-between; margin-bottom:20px;">' | |
| f'<div>' | |
| f'<span style="font-size:1.1rem; font-weight:600; color:#e0e0f0;">Sources</span>' | |
| f'<span style="margin-left:10px; padding:3px 10px; background:rgba(102,126,234,0.15); ' | |
| f'color:#8090d0; border-radius:12px; font-size:0.8rem; font-weight:600;">' | |
| f'{total} / {MAX_SOURCES_PER_NOTEBOOK}</span>' | |
| f'</div>' | |
| f'<span style="font-size:0.8rem; color:#606078;">{remaining} slots remaining</span>' | |
| f'</div>' | |
| ) | |
| def render_source_list(state: UserData) -> str: | |
| nb = get_active_notebook(state) | |
| if not nb or not nb.sources: | |
| return ( | |
| '<div style="text-align:center; padding:50px 20px; color:#606078;">' | |
| '<div style="font-size:3rem; margin-bottom:16px;">π</div>' | |
| '<h3 style="color:#a0a0b8; font-weight:600;">No sources yet</h3>' | |
| '<p style="font-size:0.9rem;">Upload documents or add web links above.<br>' | |
| 'Your sources power the AI chat and artifact generation.</p>' | |
| '</div>' | |
| ) | |
| html = f'<p style="font-weight:600; font-size:0.9rem; color:#a0a0b8; margin-bottom:12px;">Your Sources ({len(nb.sources)})</p>' | |
| for source in nb.sources: | |
| ft = source.file_type | |
| cfg = FILE_TYPE_CONFIG.get(ft, {"icon": "π", "color": "150,150,170", "label": ft.upper()}) | |
| meta_parts = [cfg["label"]] | |
| if source.size_mb: | |
| meta_parts.append(f"{source.size_mb} MB") | |
| if source.chunk_count > 0: | |
| meta_parts.append(f"{source.chunk_count} chunks") | |
| meta_str = " Β· ".join(meta_parts) | |
| if source.status == "processing": | |
| badge = '<span class="source-badge processing">Processing...</span>' | |
| elif source.status == "failed": | |
| err = source.error_message or "Unknown error" | |
| badge = f'<span class="source-badge failed" title="{err}">Failed</span>' | |
| else: | |
| badge = '<span class="source-badge ready">Ready</span>' | |
| html += ( | |
| f'<div class="source-card">' | |
| f'<div class="source-icon {ft}">{cfg["icon"]}</div>' | |
| f'<div class="source-info">' | |
| f'<div class="name">{source.filename}</div>' | |
| f'<div class="meta">{meta_str}</div>' | |
| f'</div>' | |
| f'{badge}' | |
| f'</div>' | |
| ) | |
| return html | |
| def get_source_choices(state: UserData) -> list[str]: | |
| nb = get_active_notebook(state) | |
| if not nb: | |
| return [] | |
| return [s.filename for s in nb.sources if s.status == "ready"] | |
| def handle_file_upload(files, state: UserData) -> tuple[UserData, str, str, list[str]]: | |
| """Handle file upload. Returns (state, source_list_html, header_html, source_choices).""" | |
| nb = get_active_notebook(state) | |
| if not nb or not files: | |
| return state, render_source_list(state), render_source_header(state), get_source_choices(state) | |
| for f in files: | |
| filename = f.name if hasattr(f, 'name') else str(f).rsplit("/", 1)[-1] | |
| # Extract just the filename from the path | |
| filename = filename.rsplit("/", 1)[-1] if "/" in filename else filename | |
| existing_names = [s.filename for s in nb.sources] | |
| if filename in existing_names: | |
| continue | |
| if len(nb.sources) >= MAX_SOURCES_PER_NOTEBOOK: | |
| break | |
| file_ext = filename.rsplit(".", 1)[-1].lower() if "." in filename else "" | |
| if file_ext not in ALLOWED_TYPES: | |
| continue | |
| # Get file size and path | |
| try: | |
| import os | |
| file_path = f.name if hasattr(f, 'name') else str(f) | |
| size_bytes = os.path.getsize(file_path) | |
| size_mb = round(size_bytes / (1024 * 1024), 2) | |
| except Exception: | |
| file_path = None | |
| size_mb = 0 | |
| if size_mb > MAX_FILE_SIZE_MB: | |
| continue | |
| source = Source( | |
| id=str(uuid.uuid4()), | |
| filename=filename, | |
| file_type=file_ext, | |
| size_mb=size_mb, | |
| source_url=None, | |
| chunk_count=0, | |
| status="processing", | |
| error_message=None, | |
| created_at=datetime.now().isoformat(), | |
| file_path=file_path, | |
| ) | |
| nb.sources.append(source) | |
| return state, render_source_list(state), render_source_header(state), get_source_choices(state) | |
| def handle_url_add(url: str, state: UserData) -> tuple[UserData, str, str, str, list[str]]: | |
| """Handle adding a URL source. Returns (state, source_list_html, header_html, url_textbox_value, source_choices).""" | |
| nb = get_active_notebook(state) | |
| if not nb or not url or not url.strip(): | |
| return state, render_source_list(state), render_source_header(state), "", get_source_choices(state) | |
| url = url.strip() | |
| if len(nb.sources) >= MAX_SOURCES_PER_NOTEBOOK: | |
| return state, render_source_list(state), render_source_header(state), "", get_source_choices(state) | |
| existing_urls = [s.source_url for s in nb.sources if s.source_url] | |
| if url in existing_urls: | |
| return state, render_source_list(state), render_source_header(state), "", get_source_choices(state) | |
| is_youtube = "youtube.com" in url or "youtu.be" in url | |
| file_type = "youtube" if is_youtube else "url" | |
| display_name = url[:55] + "..." if len(url) > 55 else url | |
| source = Source( | |
| id=str(uuid.uuid4()), | |
| filename=display_name, | |
| file_type=file_type, | |
| size_mb=None, | |
| source_url=url, | |
| chunk_count=0, | |
| status="processing", | |
| error_message=None, | |
| created_at=datetime.now().isoformat(), | |
| file_path=None, | |
| ) | |
| nb.sources.append(source) | |
| return state, render_source_list(state), render_source_header(state), "", get_source_choices(state) | |
| def handle_source_delete(source_name: str, state: UserData) -> tuple[UserData, str, str, list[str]]: | |
| """Delete a source by filename and remove its vectors from Pinecone.""" | |
| nb = get_active_notebook(state) | |
| if not nb or not source_name: | |
| return state, render_source_list(state), render_source_header(state), get_source_choices(state) | |
| # Delete vectors from Pinecone before removing from state | |
| source_to_delete = next((s for s in nb.sources if s.filename == source_name), None) | |
| if source_to_delete: | |
| try: | |
| from persistence.vector_store import VectorStore | |
| VectorStore().delete_by_source(source_to_delete.id, nb.id) | |
| except Exception: | |
| pass # Best-effort; source removed from UI regardless | |
| nb.sources = [s for s in nb.sources if s.filename != source_name] | |
| return state, render_source_list(state), render_source_header(state), get_source_choices(state) | |
| def run_ingestion_pipeline(state: UserData) -> tuple[UserData, str, str, list[str]]: | |
| """Process all sources with status='processing' through the ingestion engine.""" | |
| from ingestion_engine import IngestionManager | |
| nb = get_active_notebook(state) | |
| if not nb: | |
| return state, render_source_list(state), render_source_header(state), get_source_choices(state) | |
| manager = IngestionManager() | |
| for source in nb.sources: | |
| if source.status != "processing": | |
| continue | |
| chunk_count, error = manager.process_source(source, source.file_path, nb.id) | |
| if error: | |
| source.status = "failed" | |
| source.error_message = error | |
| source.chunk_count = 0 | |
| else: | |
| source.status = "ready" | |
| source.error_message = None | |
| source.chunk_count = chunk_count | |
| return state, render_source_list(state), render_source_header(state), get_source_choices(state) | |