|
|
|
|
|
|
|
|
|
|
|
import os |
|
|
import re |
|
|
import streamlit as st |
|
|
import torch |
|
|
from document_registry import DocumentRegistry |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
st.set_page_config(page_title="Enterprise Knowledge Assistant", layout="wide") |
|
|
print("CUDA available:", torch.cuda.is_available()) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
CACHE_DIR = "/tmp/hf_cache" |
|
|
os.makedirs(CACHE_DIR, exist_ok=True) |
|
|
os.environ.update({ |
|
|
"HF_HOME": CACHE_DIR, |
|
|
"TRANSFORMERS_CACHE": CACHE_DIR, |
|
|
"HF_DATASETS_CACHE": CACHE_DIR, |
|
|
"HF_MODULES_CACHE": CACHE_DIR, |
|
|
}) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
from ingestion import extract_text_from_pdf, chunk_text |
|
|
from vectorstore import build_faiss_index |
|
|
from qa import retrieve_chunks, generate_answer, cache_embeddings, embed_chunks, genai_generate |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def generate_dynamic_suggestions_from_toc(toc, chunks, doc_name="Document"): |
|
|
"""Generates 5β7 short, natural English questions based on TOC and document text.""" |
|
|
if not toc or not chunks: |
|
|
return ["How do I start using this guide?", "What does this document cover?"] |
|
|
|
|
|
titles = [] |
|
|
for sec, raw_title in toc: |
|
|
title = re.sub(r"^\s*[\dA-Za-z.\-]+\s*", "", raw_title) |
|
|
title = re.sub(r"\.{2,}\s*\d+$", "", title).strip() |
|
|
if 4 < len(title) < 120: |
|
|
titles.append(title) |
|
|
|
|
|
context_sample = " ".join(chunks[:3])[:4000] |
|
|
prompt = f""" |
|
|
You are a content assistant. Based on the Table of Contents and the sample document text below, |
|
|
generate 5β7 short, natural user-facing questions. |
|
|
Each question should be under 18 words, end with a question mark, and sound human. |
|
|
Document: "{doc_name}" |
|
|
|
|
|
TABLE OF CONTENTS: |
|
|
{chr(10).join(['- ' + t for t in titles[:8]])} |
|
|
|
|
|
SAMPLE TEXT: |
|
|
{context_sample} |
|
|
|
|
|
Output: Write each question on a new line. Do not invent facts β base questions only on the document. |
|
|
""" |
|
|
|
|
|
try: |
|
|
ai_response = genai_generate(prompt) |
|
|
lines = [ln.strip() for ln in ai_response.splitlines() if ln.strip()] |
|
|
questions = [] |
|
|
for ln in lines: |
|
|
q = re.sub(r"^[\-\u2022\*\d\.\)\s]+", "", ln).strip() |
|
|
if not q.endswith("?") and len(q.split()) < 18 and re.match(r"(?i)^(what|how|why|where|who|when|which|can|does|is|are)\b", q): |
|
|
q += "?" |
|
|
if 8 <= len(q) <= 140: |
|
|
questions.append(q) |
|
|
|
|
|
final, seen = [], set() |
|
|
for q in questions: |
|
|
if q.lower() not in seen: |
|
|
seen.add(q.lower()) |
|
|
final.append(q) |
|
|
if not final: |
|
|
final = [f"What should I know about {t.rstrip('.')}?" for t in titles[:7]] |
|
|
return final[:7] |
|
|
except Exception: |
|
|
return ["How do I start using this guide?", "What does this document cover?"] |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
st.markdown(""" |
|
|
<style> |
|
|
div.block-container {padding-top: 1.2rem; max-width: 1080px;} |
|
|
h1, h2, h3 {color: #f3f4f6; font-weight: 600;} |
|
|
.suggest-chip { |
|
|
background: #0f1724; |
|
|
border: 1px solid #374151; |
|
|
border-radius: 14px; |
|
|
color: #e6eef8; |
|
|
padding: 8px 12px; |
|
|
cursor: pointer; |
|
|
font-size: 13px; |
|
|
margin: 6px 6px 6px 0; |
|
|
display: inline-block; |
|
|
transition: background 0.2s, transform 0.1s; |
|
|
} |
|
|
.suggest-chip:hover {background: #1e3a8a; transform: translateY(-2px);} |
|
|
.answer-box { |
|
|
background: linear-gradient(180deg,#0b1220,#071027); |
|
|
border-left: 4px solid #3b82f6; |
|
|
border-radius: 8px; |
|
|
padding: 16px 18px; |
|
|
color: #e6eef8; |
|
|
margin-top: 12px; |
|
|
box-shadow: 0 4px 14px rgba(0,0,0,0.35); |
|
|
} |
|
|
.stTextInput > div > div > input { |
|
|
background-color: #0f172a !important; |
|
|
color: #f1f5f9 !important; |
|
|
border-radius: 6px !important; |
|
|
border: 1px solid #334155 !important; |
|
|
padding: 8px 10px !important; |
|
|
font-size: 15px !important; |
|
|
} |
|
|
.stTextInput > label {font-weight: 500;} |
|
|
.small-link { |
|
|
font-size: 13px; |
|
|
color: #60a5fa; |
|
|
cursor: pointer; |
|
|
} |
|
|
</style> |
|
|
""", unsafe_allow_html=True) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
with st.sidebar: |
|
|
|
|
|
st.markdown("### π§ Response Style") |
|
|
mode = st.radio( |
|
|
"", |
|
|
("Strict (Document-only)", "Extended (Document + General)"), |
|
|
index=0, |
|
|
help="Strict = answers only from the uploaded document. Extended = may include related general info.", |
|
|
) |
|
|
|
|
|
st.markdown("---") |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
show_dev = st.checkbox("Show advanced settings (for developers)", value=False) |
|
|
if show_dev: |
|
|
st.markdown("### βοΈ Developer Options") |
|
|
chunk_size = st.slider("Chunk Size", 200, 1500, 1000, step=50) |
|
|
overlap = st.slider("Chunk Overlap", 50, 200, 120, step=10) |
|
|
top_k = st.slider("Top K Results", 1, 10, 7) |
|
|
else: |
|
|
chunk_size, overlap, top_k = 1000, 120, 5 |
|
|
|
|
|
st.markdown("---") |
|
|
st.caption("β¨ Built by Shubham Sharma") |
|
|
|
|
|
|
|
|
if show_dev: |
|
|
st.markdown("---") |
|
|
with st.expander("π§© Developer Insights", expanded=False): |
|
|
st.markdown("**Retrieved Chunks (Context):**") |
|
|
for i, r in enumerate(st.session_state.get("retrieved", []), start=1): |
|
|
st.markdown(f"- **Chunk {i}:** {r}") |
|
|
|
|
|
toc_data = st.session_state.get("toc", []) |
|
|
if toc_data: |
|
|
st.markdown("---") |
|
|
st.markdown("**Document Sections (TOC):**") |
|
|
toc_text = "\n".join([f"{sec}. {title}" for sec, title in toc_data]) |
|
|
st.text_area("", toc_text, height=120) |
|
|
|
|
|
doc_text = st.session_state.get("text", "") |
|
|
if doc_text: |
|
|
st.markdown("---") |
|
|
st.markdown("**Document Preview:**") |
|
|
st.text_area("", doc_text[:1000], height=120) |
|
|
st.caption(f"{len(st.session_state.get('chunks', []))} chunks processed.") |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
for key, val in { |
|
|
"user_query_input": "", |
|
|
"show_more": False, |
|
|
"selected_suggestion": None, |
|
|
"query_suggestions_fixed": None, |
|
|
"last_doc": None, |
|
|
"doc_lang": "en", |
|
|
"doc_ready": False, |
|
|
}.items(): |
|
|
if key not in st.session_state: |
|
|
st.session_state[key] = val |
|
|
|
|
|
def set_user_query(q, idx): |
|
|
st.session_state["user_query_input"] = q |
|
|
st.session_state["selected_suggestion"] = idx |
|
|
st.experimental_rerun() |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
st.title("π Enterprise Knowledge Assistant") |
|
|
st.caption("Query SAP documentation and enterprise PDFs β powered by reasoning and retrieval.") |
|
|
|
|
|
doc_choice = st.radio("Select a document:", ["-- Select --", "Sample PDF", "Upload Custom PDF"], index=0) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
import hashlib |
|
|
|
|
|
def _hash_content(file_path): |
|
|
"""Generate a short SHA256 hash of the file's actual binary content.""" |
|
|
hasher = hashlib.sha256() |
|
|
with open(file_path, "rb") as f: |
|
|
while chunk := f.read(8192): |
|
|
hasher.update(chunk) |
|
|
return hasher.hexdigest()[:12] |
|
|
|
|
|
if doc_choice == "-- Select --": |
|
|
st.info("β¬
οΈ Select or upload a document to begin.") |
|
|
else: |
|
|
temp_path = None |
|
|
if doc_choice == "Sample PDF": |
|
|
temp_path = os.path.join(os.path.dirname(__file__), "sample.pdf") |
|
|
st.markdown("β
**Sample PDF selected.** Preparing document...") |
|
|
else: |
|
|
uploaded_file = st.file_uploader("Upload a PDF document (max 200MB):", type="pdf", label_visibility="collapsed") |
|
|
if uploaded_file: |
|
|
temp_path = os.path.join("/tmp", uploaded_file.name) |
|
|
with open(temp_path, "wb") as f: |
|
|
f.write(uploaded_file.getbuffer()) |
|
|
else: |
|
|
st.stop() |
|
|
|
|
|
if temp_path: |
|
|
doc_name = os.path.basename(temp_path) |
|
|
file_hash = _hash_content(temp_path) |
|
|
doc_identifier = f"{doc_name}_{file_hash}" |
|
|
|
|
|
if "doc_ready" not in st.session_state or st.session_state.get("last_doc") != doc_identifier: |
|
|
status = st.empty() |
|
|
status.info("π€ Upload complete β reading document...") |
|
|
|
|
|
text, toc, toc_source = extract_text_from_pdf(temp_path) |
|
|
status.info("π Parsing and chunking document...") |
|
|
chunks = chunk_text(text, chunk_size=chunk_size, overlap=overlap) |
|
|
status.info("π§ Building embeddings and search index...") |
|
|
embeddings = cache_embeddings(doc_name, chunks, embed_chunks) |
|
|
index = build_faiss_index(embeddings) |
|
|
|
|
|
registry = st.session_state.get("registry") |
|
|
if not registry: |
|
|
registry = DocumentRegistry() |
|
|
st.session_state["registry"] = registry |
|
|
|
|
|
registry.register(temp_path, chunks, embeddings, index) |
|
|
|
|
|
|
|
|
status.success("β
Document processed successfully β all set to query your assistant!") |
|
|
|
|
|
st.session_state.update({ |
|
|
"text": text, |
|
|
"toc": toc, |
|
|
"chunks": chunks, |
|
|
"embeddings": embeddings, |
|
|
"index": index, |
|
|
"doc_ready": True, |
|
|
"last_doc": doc_identifier, |
|
|
"status_text": "β
Document processed successfully β all set to query your assistant!" |
|
|
}) |
|
|
|
|
|
query_suggestions = generate_dynamic_suggestions_from_toc(toc, chunks, doc_name) |
|
|
st.session_state["query_suggestions_fixed"] = query_suggestions |
|
|
st.session_state["user_query_input"] = "" |
|
|
st.session_state["selected_suggestion"] = None |
|
|
st.session_state["show_more"] = False |
|
|
st.rerun() |
|
|
|
|
|
else: |
|
|
text = st.session_state["text"] |
|
|
toc = st.session_state["toc"] |
|
|
chunks = st.session_state["chunks"] |
|
|
embeddings = st.session_state["embeddings"] |
|
|
index = st.session_state["index"] |
|
|
query_suggestions = st.session_state.get("query_suggestions_fixed", []) |
|
|
st.info(st.session_state.get("status_text", f"π {doc_name} is ready for queries.")) |
|
|
|
|
|
|
|
|
st.markdown("### π¬ Ask the Assistant") |
|
|
if query_suggestions: |
|
|
visible = query_suggestions if st.session_state["show_more"] else query_suggestions[:3] |
|
|
cols = st.columns(min(3, len(visible))) |
|
|
for i, q in enumerate(visible): |
|
|
if cols[i % 3].button(f"π¬ {q}", key=f"sugg_{i}"): |
|
|
set_user_query(q, i) |
|
|
|
|
|
toggle_text = "Show less β²" if st.session_state["show_more"] else "Show more βΌ" |
|
|
if st.button(toggle_text, help="Show or hide more suggestions"): |
|
|
st.session_state["show_more"] = not st.session_state["show_more"] |
|
|
st.rerun() |
|
|
|
|
|
user_query = st.text_input("Type your question or click one above:", key="user_query_input") |
|
|
|
|
|
if user_query.strip(): |
|
|
reasoning_mode = mode == "Extended (Document + General)" |
|
|
with st.spinner("π Generating your answer..."): |
|
|
retrieved = retrieve_chunks(user_query, index, chunks, top_k=top_k, embeddings=embeddings) |
|
|
answer = generate_answer(user_query, retrieved, reasoning_mode=reasoning_mode) |
|
|
st.session_state["retrieved"] = retrieved |
|
|
|
|
|
st.markdown("### π€ Assistantβs Answer") |
|
|
if not reasoning_mode and not answer.startswith("β οΈ"): |
|
|
answer = re.sub(r"\*\*(.*?)\*\*", r"\1", answer) |
|
|
answer = re.sub(r"(^|\n)-\s*", r"\1<br>β’ ", answer) |
|
|
st.markdown(f"<div class='answer-box'>{answer}</div>", unsafe_allow_html=True) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
st.markdown(""" |
|
|
<style> |
|
|
section[data-testid="stSidebar"] div.stExpander { |
|
|
max-height: 480px; |
|
|
overflow-y: auto; |
|
|
} |
|
|
</style> |
|
|
""", unsafe_allow_html=True) |
|
|
|