""" Antigravity Notebook - Streamlit UI NotebookLM-style interface for the Antigravity Notebook system. """ import streamlit as st import requests from typing import List, Dict, Optional import plotly.graph_objects as go from datetime import datetime # API Configuration API_BASE_URL = "http://localhost:8000" def init_session_state(): """Initialize session state variables""" if "selected_notebook_id" not in st.session_state: st.session_state.selected_notebook_id = None if "chat_messages" not in st.session_state: st.session_state.chat_messages = [] if "notebooks" not in st.session_state: st.session_state.notebooks = [] def api_call(method: str, endpoint: str, **kwargs): """Make API call with error handling""" url = f"{API_BASE_URL}{endpoint}" try: response = requests.request(method, url, **kwargs) response.raise_for_status() return response.json() if response.text else None except requests.exceptions.RequestException as e: st.error(f"API Error: {e}") return None def load_notebooks() -> List[Dict]: """Load all notebooks""" data = api_call("GET", "/notebooks/") return data if data else [] def create_notebook(name: str, description: str = ""): """Create a new notebook""" return api_call("POST", "/notebooks/", json={ "name": name, "description": description }) def get_notebook_details(notebook_id: str) -> Optional[Dict]: """Get detailed notebook information""" return api_call("GET", f"/notebooks/{notebook_id}") def get_notebook_stats(notebook_id: str) -> Optional[Dict]: """Get notebook statistics""" return api_call("GET", f"/notebooks/{notebook_id}/stats") def upload_pdf(notebook_id: str, file): """Upload a PDF file""" files = {"file": (file.name, file, "application/pdf")} return api_call("POST", f"/sources/notebooks/{notebook_id}/sources/upload", files=files) def add_url_source(notebook_id: str, url: str, title: str = None): """Add a URL source""" return api_call("POST", f"/sources/notebooks/{notebook_id}/sources/url", json={ "url": url, "title": title }) def add_text_source(notebook_id: str, content: str, title: str): """Add text source""" return api_call("POST", f"/sources/notebooks/{notebook_id}/sources/text", json={ "content": content, "title": title }) def delete_source(source_id: str): """Delete a source""" return api_call("DELETE", f"/sources/sources/{source_id}") def query_notebook(notebook_id: str, query: str) -> Optional[Dict]: """Query the notebook""" return api_call("POST", f"/chat/notebooks/{notebook_id}/chat", json={ "query": query, "include_history": True }) def get_chat_history(notebook_id: str) -> List[Dict]: """Get chat history""" data = api_call("GET", f"/chat/notebooks/{notebook_id}/messages") return data if data else [] def render_memory_gauge(stats: Dict): """Render memory usage gauge""" usage_percent = stats.get("context_usage_percent", 0) fig = go.Figure(go.Indicator( mode="gauge+number+delta", value=usage_percent, title={'text': "Context Usage"}, delta={'reference': 100}, gauge={ 'axis': {'range': [None, 100]}, 'bar': {'color': "darkblue"}, 'steps': [ {'range': [0, 50], 'color': "lightgreen"}, {'range': [50, 80], 'color': "yellow"}, {'range': [80, 100], 'color': "orange"} ], 'threshold': { 'line': {'color': "red", 'width': 4}, 'thickness': 0.75, 'value': 100 } } )) fig.update_layout(height=200, margin=dict(l=20, r=20, t=40, b=20)) st.plotly_chart(fig, use_container_width=True) # Stats details st.caption(f"📊 {stats.get('total_tokens', 0):,} / {stats.get('max_context_tokens', 0):,} tokens") if stats.get('can_fit_full_context', True): st.success("✅ Full notebook fits in context!") else: st.warning("⚠️ Using selective retrieval") def render_sidebar(): """Render the left sidebar with notebook and source management""" with st.sidebar: st.title("📚 Antigravity Notebook") # Notebook selector st.subheader("Notebooks") notebooks = load_notebooks() st.session_state.notebooks = notebooks if notebooks: notebook_options = {nb["name"]: nb["id"] for nb in notebooks} selected_name = st.selectbox( "Select Notebook", options=list(notebook_options.keys()), key="notebook_selector" ) st.session_state.selected_notebook_id = notebook_options[selected_name] else: st.info("No notebooks yet. Create one below!") st.session_state.selected_notebook_id = None # Create new notebook with st.expander("➕ Create New Notebook"): new_name = st.text_input("Name", key="new_notebook_name") new_desc = st.text_area("Description (optional)", key="new_notebook_desc") if st.button("Create Notebook"): if new_name: result = create_notebook(new_name, new_desc) if result: st.success(f"Created: {new_name}") st.rerun() else: st.error("Name is required") st.divider() # Sources management (only if notebook selected) if st.session_state.selected_notebook_id: notebook_id = st.session_state.selected_notebook_id # Get notebook details details = get_notebook_details(notebook_id) stats = get_notebook_stats(notebook_id) if details and stats: st.subheader("📁 Sources") # Display sources sources = details.get("sources", []) if sources: for source in sources: with st.container(): col1, col2 = st.columns([3, 1]) with col1: icon = {"pdf": "📄", "url": "🌐", "text": "📝"}.get( source["source_type"], "📎" ) st.write(f"{icon} {source['filename']}") st.caption(f"{source['total_tokens']} tokens") with col2: if st.button("🗑️", key=f"delete_{source['id']}"): delete_source(source['id']) st.rerun() st.divider() else: st.info("No sources yet. Add some below!") # Add sources st.subheader("➕ Add Source") tab1, tab2, tab3 = st.tabs(["📄 PDF", "🌐 URL", "📝 Text"]) with tab1: uploaded_file = st.file_uploader( "Upload PDF", type=["pdf"], key="pdf_uploader" ) if uploaded_file and st.button("Upload PDF"): with st.spinner("Processing PDF..."): result = upload_pdf(notebook_id, uploaded_file) if result: st.success("PDF uploaded!") st.rerun() with tab2: url_input = st.text_input("URL", key="url_input") url_title = st.text_input("Title (optional)", key="url_title") if st.button("Add URL"): if url_input: with st.spinner("Scraping URL..."): result = add_url_source( notebook_id, url_input, url_title if url_title else None ) if result: st.success("URL added!") st.rerun() else: st.error("URL is required") with tab3: text_title = st.text_input("Title", key="text_title") text_content = st.text_area("Content", key="text_content", height=150) if st.button("Add Text"): if text_title and text_content: with st.spinner("Processing text..."): result = add_text_source(notebook_id, text_content, text_title) if result: st.success("Text added!") st.rerun() else: st.error("Title and content are required") st.divider() # Memory gauge st.subheader("🧠 Memory Usage") render_memory_gauge(stats) def render_main_chat(): """Render the main chat interface""" st.title("💬 Chat with Your Notebook") if not st.session_state.selected_notebook_id: st.info("👈 Select or create a notebook to start chatting") return notebook_id = st.session_state.selected_notebook_id # Get notebook details details = get_notebook_details(notebook_id) if not details: st.error("Failed to load notebook") return st.subheader(f"📓 {details['name']}") if details.get('description'): st.caption(details['description']) # Check if sources exist if not details.get('sources'): st.warning("⚠️ No sources in this notebook. Add some sources to start chatting!") return st.divider() # Load chat history chat_history = get_chat_history(notebook_id) # Display chat history for message in chat_history: role = message["role"] content = message["content"] if role == "user": with st.chat_message("user"): st.write(content) else: with st.chat_message("assistant"): st.write(content) # Show sources if available sources = message.get("sources_used", []) if sources: with st.expander("📚 Sources"): unique_sources = {} for src in sources: key = src["source_id"] if key not in unique_sources: unique_sources[key] = src for src in unique_sources.values(): st.caption(f"• {src['filename']} ({src['source_type']})") # Chat input user_query = st.chat_input("Ask a question about your sources...") if user_query: # Display user message with st.chat_message("user"): st.write(user_query) # Query the notebook with st.chat_message("assistant"): with st.spinner("Thinking..."): response = query_notebook(notebook_id, user_query) if response: st.write(response["response"]) # Show sources sources = response.get("sources_used", []) if sources: with st.expander("📚 Sources"): unique_sources = {} for src in sources: key = src["source_id"] if key not in unique_sources: unique_sources[key] = src for src in unique_sources.values(): st.caption(f"• {src['filename']} ({src['source_type']})") # Show token usage token_usage = response.get("token_usage", {}) if token_usage: st.caption( f"📊 Used {token_usage.get('selected_tokens', 0):,} / " f"{token_usage.get('max_tokens', 0):,} tokens " f"({token_usage.get('usage_percent', 0):.1f}%)" ) else: st.error("Failed to get response") # Rerun to show the new message in history st.rerun() def main(): """Main application""" st.set_page_config( page_title="Antigravity Notebook", page_icon="📚", layout="wide", initial_sidebar_state="expanded" ) init_session_state() # Render sidebar render_sidebar() # Render main chat area render_main_chat() if __name__ == "__main__": main()