import html import streamlit as st from papertrail.ingest.pdf import load_pdf from papertrail.ingest.web import load_url from papertrail.qa import answer_question, build_knowledge_base from papertrail.utils.text import clean_raw_passage from papertrail.utils.stream import stream_words_into_bubble # ── Page config ──────────────────────────────────────────────────────────────── st.set_page_config( page_title="Papertrail", page_icon="📄", layout="wide", initial_sidebar_state="expanded", ) # ── Styles ───────────────────────────────────────────────────────────────────── st.markdown(""" """, unsafe_allow_html=True) # ── Session state ────────────────────────────────────────────────────────────── for k, d in [ ("retriever", None), ("source_name", None), ("chunk_count", 0), ("messages", []), ("source_type", "URL"), ("answer_mode", "Hugging Face (best effort)"), ("pdf_bytes", None), ("pdf_name", ""), ]: st.session_state.setdefault(k, d) # ── Sidebar ──────────────────────────────────────────────────────────────────── with st.sidebar: st.markdown("## 📄 Papertrail") st.markdown("Load a document, then ask it anything.") st.markdown("---") st.radio( "Source", ["URL", "PDF Upload", "Paste Text"], key="source_type", label_visibility="collapsed", ) source_type = st.session_state.source_type if source_type == "PDF Upload": uploaded = st.file_uploader("Upload PDF", type=["pdf"], label_visibility="collapsed") if uploaded is not None: fb = uploaded.read() if fb: st.session_state.pdf_bytes = fb st.session_state.pdf_name = uploaded.name if st.session_state.pdf_bytes and st.button("Build Knowledge Base", key="build_pdf"): with st.spinner("Reading PDF..."): text, section_map, page_map, err = load_pdf(st.session_state.pdf_bytes) if err: st.error(err) else: with st.spinner("Indexing..."): err = build_knowledge_base(text, st.session_state.pdf_name, section_map=section_map, page_map=page_map) if err: st.error(err) else: st.session_state.pdf_bytes = None st.rerun() if st.session_state.retriever: st.markdown("---") st.caption(f"Active: {str(st.session_state.source_name)[:40]}") st.caption(f"{st.session_state.chunk_count} chunks indexed") st.session_state.answer_mode = st.selectbox( "Answer mode", ["Structured (no LLM)", "Local (Ollama)", "Hugging Face (best effort)"], index=["Structured (no LLM)", "Local (Ollama)", "Hugging Face (best effort)"].index( st.session_state.answer_mode ), ) # if st.button("Clear & start over"): # for k in ("retriever","source_name","chunk_count","messages","pdf_bytes","pdf_name"): # st.session_state[k] = None if k not in ("messages",) else [] # st.session_state.pdf_bytes = None # st.session_state.pdf_name = "" # st.rerun() if st.button("Clear & start over"): st.session_state.retriever = None st.session_state.source_name = None st.session_state.chunk_count = 0 st.session_state.messages = [] st.session_state.pdf_bytes = None st.session_state.pdf_name = "" st.rerun() # ── Header ───────────────────────────────────────────────────────────────────── st.markdown('
Papertrail
', unsafe_allow_html=True) st.markdown('
Ask anything from your document
', unsafe_allow_html=True) # ── Source input ─────────────────────────────────────────────────────────────── if source_type == "URL": col1, col2 = st.columns([5, 1]) with col1: url_val = st.text_input("URL", placeholder="https://example.com/article", label_visibility="collapsed", key="url_input") with col2: fetch_clicked = st.button("Fetch", use_container_width=True) if fetch_clicked: if not url_val.strip(): st.warning("Enter a URL first.") else: with st.spinner("Fetching..."): text, section_map, err = load_url(url_val) if err: st.error(err) st.info("Tip: copy the page text and use Paste Text instead.") else: with st.spinner("Indexing..."): err = build_knowledge_base(text, url_val.strip(), section_map=section_map) if err: st.error(err) else: st.rerun() elif source_type == "Paste Text": pasted = st.text_area("Paste text", height=200, placeholder="Paste any text here -- articles, docs, notes...", label_visibility="collapsed", key="paste_input") if st.button("Build Knowledge Base", key="build_paste"): if not pasted.strip(): st.warning("Paste some text first.") else: with st.spinner("Indexing..."): err = build_knowledge_base(pasted.strip(), "Pasted text") if err: st.error(err) else: st.rerun() # ── Source badge ─────────────────────────────────────────────────────────────── if st.session_state.retriever and st.session_state.source_name: src = html.escape(str(st.session_state.source_name)) st.markdown( f'
{src}' f'{st.session_state.chunk_count} chunks
', unsafe_allow_html=True, ) # ── Empty state ──────────────────────────────────────────────────────────────── if not st.session_state.retriever: msg = "Upload a PDF from the sidebar to begin." if source_type == "PDF Upload" \ else "Load a document above to begin." st.markdown(f'
{msg}
', unsafe_allow_html=True) elif not st.session_state.messages: st.markdown('
Knowledge base ready -- ask your first question.
', unsafe_allow_html=True) # ── Chat history ─────────────────────────────────────────────────────────────── for msg in st.session_state.messages: if msg.get("role") == "user": st.markdown(f'
{html.escape(msg.get("content",""))}
', unsafe_allow_html=True) else: st.markdown(f'
{html.escape(msg.get("answer_text") or "")}
', unsafe_allow_html=True) if msg.get("attribution_html"): st.markdown(msg["attribution_html"], unsafe_allow_html=True) if msg.get("extras"): with st.expander("Show supporting passages"): for chunk, score in msg["extras"]: st.caption(f"score: {score:.3f}") st.write(clean_raw_passage(chunk)) # ── Live input ───────────────────────────────────────────────────────────────── if st.session_state.retriever: question = st.chat_input("Ask a question about your document...") if question and question.strip(): q = question.strip() st.markdown(f'
{html.escape(q)}
', unsafe_allow_html=True) typing_ph = st.empty() typing_ph.markdown( '
assistant
' '

Typing

', unsafe_allow_html=True, ) answer_html, answer_text, score, extras, attribution_html = \ answer_question(st.session_state.retriever, q) typing_ph.empty() stream_words_into_bubble(answer_text, source_label="assistant") if attribution_html: st.markdown(attribution_html, unsafe_allow_html=True) if extras: with st.expander("Show supporting passages"): for chunk, s in extras: st.caption(f"score: {s:.3f}") st.write(clean_raw_passage(chunk)) st.session_state.messages.append({"role": "user", "content": q}) st.session_state.messages.append({ "role": "assistant", "content": answer_html, "extras": extras, "attribution_html": attribution_html, "answer_text": answer_text, }) st.markdown( "", unsafe_allow_html=True, )