import html
import streamlit as st
from papertrail.ingest.pdf import load_pdf
from papertrail.ingest.web import load_url
from papertrail.qa import answer_question, build_knowledge_base
from papertrail.utils.text import clean_raw_passage
from papertrail.utils.stream import stream_words_into_bubble
# ── Page config ────────────────────────────────────────────────────────────────
st.set_page_config(
page_title="Papertrail",
page_icon="📄",
layout="wide",
initial_sidebar_state="expanded",
)
# ── Styles ─────────────────────────────────────────────────────────────────────
st.markdown("""
""", unsafe_allow_html=True)
# ── Session state ──────────────────────────────────────────────────────────────
for k, d in [
("retriever", None),
("source_name", None),
("chunk_count", 0),
("messages", []),
("source_type", "URL"),
("answer_mode", "Hugging Face (best effort)"),
("pdf_bytes", None),
("pdf_name", ""),
]:
st.session_state.setdefault(k, d)
# ── Sidebar ────────────────────────────────────────────────────────────────────
with st.sidebar:
st.markdown("## 📄 Papertrail")
st.markdown("Load a document, then ask it anything.")
st.markdown("---")
st.radio(
"Source",
["URL", "PDF Upload", "Paste Text"],
key="source_type",
label_visibility="collapsed",
)
source_type = st.session_state.source_type
if source_type == "PDF Upload":
uploaded = st.file_uploader("Upload PDF", type=["pdf"], label_visibility="collapsed")
if uploaded is not None:
fb = uploaded.read()
if fb:
st.session_state.pdf_bytes = fb
st.session_state.pdf_name = uploaded.name
if st.session_state.pdf_bytes and st.button("Build Knowledge Base", key="build_pdf"):
with st.spinner("Reading PDF..."):
text, section_map, page_map, err = load_pdf(st.session_state.pdf_bytes)
if err:
st.error(err)
else:
with st.spinner("Indexing..."):
err = build_knowledge_base(text, st.session_state.pdf_name,
section_map=section_map, page_map=page_map)
if err:
st.error(err)
else:
st.session_state.pdf_bytes = None
st.rerun()
if st.session_state.retriever:
st.markdown("---")
st.caption(f"Active: {str(st.session_state.source_name)[:40]}")
st.caption(f"{st.session_state.chunk_count} chunks indexed")
st.session_state.answer_mode = st.selectbox(
"Answer mode",
["Structured (no LLM)", "Local (Ollama)", "Hugging Face (best effort)"],
index=["Structured (no LLM)", "Local (Ollama)", "Hugging Face (best effort)"].index(
st.session_state.answer_mode
),
)
# if st.button("Clear & start over"):
# for k in ("retriever","source_name","chunk_count","messages","pdf_bytes","pdf_name"):
# st.session_state[k] = None if k not in ("messages",) else []
# st.session_state.pdf_bytes = None
# st.session_state.pdf_name = ""
# st.rerun()
if st.button("Clear & start over"):
st.session_state.retriever = None
st.session_state.source_name = None
st.session_state.chunk_count = 0
st.session_state.messages = []
st.session_state.pdf_bytes = None
st.session_state.pdf_name = ""
st.rerun()
# ── Header ─────────────────────────────────────────────────────────────────────
st.markdown('
', unsafe_allow_html=True)
st.markdown('Ask anything from your document
', unsafe_allow_html=True)
# ── Source input ───────────────────────────────────────────────────────────────
if source_type == "URL":
col1, col2 = st.columns([5, 1])
with col1:
url_val = st.text_input("URL", placeholder="https://example.com/article",
label_visibility="collapsed", key="url_input")
with col2:
fetch_clicked = st.button("Fetch", use_container_width=True)
if fetch_clicked:
if not url_val.strip():
st.warning("Enter a URL first.")
else:
with st.spinner("Fetching..."):
text, section_map, err = load_url(url_val)
if err:
st.error(err)
st.info("Tip: copy the page text and use Paste Text instead.")
else:
with st.spinner("Indexing..."):
err = build_knowledge_base(text, url_val.strip(), section_map=section_map)
if err:
st.error(err)
else:
st.rerun()
elif source_type == "Paste Text":
pasted = st.text_area("Paste text", height=200,
placeholder="Paste any text here -- articles, docs, notes...",
label_visibility="collapsed", key="paste_input")
if st.button("Build Knowledge Base", key="build_paste"):
if not pasted.strip():
st.warning("Paste some text first.")
else:
with st.spinner("Indexing..."):
err = build_knowledge_base(pasted.strip(), "Pasted text")
if err:
st.error(err)
else:
st.rerun()
# ── Source badge ───────────────────────────────────────────────────────────────
if st.session_state.retriever and st.session_state.source_name:
src = html.escape(str(st.session_state.source_name))
st.markdown(
f'{src}'
f'{st.session_state.chunk_count} chunks
',
unsafe_allow_html=True,
)
# ── Empty state ────────────────────────────────────────────────────────────────
if not st.session_state.retriever:
msg = "Upload a PDF from the sidebar to begin." if source_type == "PDF Upload" \
else "Load a document above to begin."
st.markdown(f'{msg}
', unsafe_allow_html=True)
elif not st.session_state.messages:
st.markdown('Knowledge base ready -- ask your first question.
',
unsafe_allow_html=True)
# ── Chat history ───────────────────────────────────────────────────────────────
for msg in st.session_state.messages:
if msg.get("role") == "user":
st.markdown(f'{html.escape(msg.get("content",""))}
',
unsafe_allow_html=True)
else:
st.markdown(f'{html.escape(msg.get("answer_text") or "")}
',
unsafe_allow_html=True)
if msg.get("attribution_html"):
st.markdown(msg["attribution_html"], unsafe_allow_html=True)
if msg.get("extras"):
with st.expander("Show supporting passages"):
for chunk, score in msg["extras"]:
st.caption(f"score: {score:.3f}")
st.write(clean_raw_passage(chunk))
# ── Live input ─────────────────────────────────────────────────────────────────
if st.session_state.retriever:
question = st.chat_input("Ask a question about your document...")
if question and question.strip():
q = question.strip()
st.markdown(f'{html.escape(q)}
', unsafe_allow_html=True)
typing_ph = st.empty()
typing_ph.markdown(
'',
unsafe_allow_html=True,
)
answer_html, answer_text, score, extras, attribution_html = \
answer_question(st.session_state.retriever, q)
typing_ph.empty()
stream_words_into_bubble(answer_text, source_label="assistant")
if attribution_html:
st.markdown(attribution_html, unsafe_allow_html=True)
if extras:
with st.expander("Show supporting passages"):
for chunk, s in extras:
st.caption(f"score: {s:.3f}")
st.write(clean_raw_passage(chunk))
st.session_state.messages.append({"role": "user", "content": q})
st.session_state.messages.append({
"role": "assistant", "content": answer_html,
"extras": extras, "attribution_html": attribution_html,
"answer_text": answer_text,
})
st.markdown(
"",
unsafe_allow_html=True,
)