|
|
|
|
|
|
|
|
|
|
|
import os |
|
|
import re |
|
|
import streamlit as st |
|
|
import torch |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
st.set_page_config(page_title="Enterprise Knowledge Assistant", layout="wide") |
|
|
print("CUDA available:", torch.cuda.is_available()) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
CACHE_DIR = "/tmp/hf_cache" |
|
|
os.makedirs(CACHE_DIR, exist_ok=True) |
|
|
os.environ.update({ |
|
|
"HF_HOME": CACHE_DIR, |
|
|
"TRANSFORMERS_CACHE": CACHE_DIR, |
|
|
"HF_DATASETS_CACHE": CACHE_DIR, |
|
|
"HF_MODULES_CACHE": CACHE_DIR, |
|
|
}) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
from ingestion import extract_text_from_pdf, chunk_text |
|
|
from vectorstore import build_faiss_index |
|
|
from qa import retrieve_chunks, generate_answer, cache_embeddings, embed_chunks, genai_generate |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
import re |
|
|
from langdetect import detect |
|
|
|
|
|
def detect_language(text_sample: str) -> str: |
|
|
""" |
|
|
Quick robust detection: |
|
|
- If Devanagari chars present β Hindi (hi) |
|
|
- Else fallback to langdetect (which needs real text to be accurate) |
|
|
""" |
|
|
try: |
|
|
|
|
|
if re.search(r"[\u0900-\u097F]", text_sample): |
|
|
return "hi" |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
lang = detect(text_sample) |
|
|
return "hi" if lang.startswith("hi") else "en" |
|
|
except Exception: |
|
|
return "en" |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def generate_dynamic_suggestions_from_toc(toc, chunks, doc_name="Document"): |
|
|
if not toc or not chunks: |
|
|
return [] |
|
|
titles = [] |
|
|
for sec, raw_title in toc: |
|
|
title = re.sub(r"^\s*[\dA-Za-z.\-]+\s*", "", raw_title) |
|
|
title = re.sub(r"\.{2,}\s*\d+$", "", title).strip() |
|
|
if 4 < len(title) < 120: |
|
|
titles.append(title) |
|
|
context_sample = " ".join(chunks[:3])[:4000] |
|
|
prompt = f""" |
|
|
You are generating short, natural, and context-aware questions for users reading "{doc_name}". |
|
|
Use the Table of Contents and some document text for inspiration. |
|
|
|
|
|
TABLE OF CONTENTS: |
|
|
{chr(10).join(['- ' + t for t in titles[:8]])} |
|
|
|
|
|
SAMPLE TEXT: |
|
|
{context_sample} |
|
|
|
|
|
Generate 5β7 clear and human-like questions based strictly on this document. |
|
|
Each should sound natural, under 18 words, and avoid robotic phrasing. |
|
|
""" |
|
|
try: |
|
|
ai_response = genai_generate(prompt) |
|
|
questions = re.findall(r"[-β’]?\s*(.+?)\?", ai_response) |
|
|
clean_qs = [q.strip("β’-β ").strip() + "?" for q in questions if 8 < len(q) < 120] |
|
|
seen, final = set(), [] |
|
|
for q in clean_qs: |
|
|
if q.lower() not in seen: |
|
|
seen.add(q.lower()) |
|
|
final.append(q) |
|
|
return final[:7] |
|
|
except Exception: |
|
|
return ["How do I start using this guide?", "What does this document cover?"] |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
st.markdown(""" |
|
|
<style> |
|
|
div.block-container {padding-top: 1.2rem; max-width: 1080px;} |
|
|
h1, h2, h3 {color: #f3f4f6; font-weight: 600;} |
|
|
.suggest-chip { |
|
|
background: #0f1724; |
|
|
border: 1px solid #374151; |
|
|
border-radius: 14px; |
|
|
color: #e6eef8; |
|
|
padding: 8px 12px; |
|
|
cursor: pointer; |
|
|
font-size: 13px; |
|
|
margin: 6px 6px 6px 0; |
|
|
display: inline-block; |
|
|
transition: background 0.2s, transform 0.1s; |
|
|
} |
|
|
.suggest-chip:hover {background: #1e3a8a; transform: translateY(-2px);} |
|
|
.answer-box { |
|
|
background: linear-gradient(180deg,#0b1220,#071027); |
|
|
border-left: 4px solid #3b82f6; |
|
|
border-radius: 8px; |
|
|
padding: 16px 18px; |
|
|
color: #e6eef8; |
|
|
margin-top: 12px; |
|
|
box-shadow: 0 4px 14px rgba(0,0,0,0.35); |
|
|
} |
|
|
.stTextInput > div > div > input { |
|
|
background-color: #0f172a !important; |
|
|
color: #f1f5f9 !important; |
|
|
border-radius: 6px !important; |
|
|
border: 1px solid #334155 !important; |
|
|
padding: 8px 10px !important; |
|
|
font-size: 15px !important; |
|
|
} |
|
|
.stTextInput > label {font-weight: 500;} |
|
|
.small-link { |
|
|
font-size: 13px; |
|
|
color: #60a5fa; |
|
|
cursor: pointer; |
|
|
} |
|
|
</style> |
|
|
""", unsafe_allow_html=True) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
with st.sidebar: |
|
|
st.markdown("### π§ Response Style") |
|
|
mode = st.radio( |
|
|
"", |
|
|
("Strict (Document-only)", "Extended (Document + general)"), |
|
|
index=0, |
|
|
help="Strict = answers only from the uploaded document. Extended = may include related general info.", |
|
|
) |
|
|
|
|
|
st.markdown("---") |
|
|
show_dev = st.checkbox("Show advanced settings (for developers)", value=False) |
|
|
if show_dev: |
|
|
st.markdown("### βοΈ Developer Options") |
|
|
chunk_size = st.slider("Chunk Size", 200, 1500, 1000, step=50) |
|
|
overlap = st.slider("Chunk Overlap", 50, 200, 120, step=10) |
|
|
top_k = st.slider("Top K Results", 1, 10, 7) |
|
|
else: |
|
|
chunk_size, overlap, top_k = 1000, 120, 5 |
|
|
|
|
|
st.markdown("---") |
|
|
st.caption("β¨ Built by Shubham Sharma") |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
for key, val in { |
|
|
"user_query_input": "", |
|
|
"show_more": False, |
|
|
"selected_suggestion": None, |
|
|
"query_suggestions_fixed": None, |
|
|
"last_doc": None, |
|
|
"doc_lang": "en", |
|
|
}.items(): |
|
|
if key not in st.session_state: |
|
|
st.session_state[key] = val |
|
|
|
|
|
def set_user_query(q, idx): |
|
|
st.session_state["user_query_input"] = q |
|
|
st.session_state["selected_suggestion"] = idx |
|
|
st.experimental_rerun() |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
st.title("π Enterprise Knowledge Assistant") |
|
|
st.caption("Query SAP documentation and enterprise PDFs β powered by reasoning and retrieval.") |
|
|
|
|
|
doc_choice = st.radio("Select a document:", ["-- Select --", "Sample PDF", "Upload Custom PDF"], index=0) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if doc_choice == "-- Select --": |
|
|
st.info("β¬
οΈ Select or upload a document to begin.") |
|
|
else: |
|
|
if doc_choice == "Sample PDF": |
|
|
temp_path = os.path.join(os.path.dirname(__file__), "sample.pdf") |
|
|
st.success("π Sample document loaded successfully β you can start asking your questions below.") |
|
|
else: |
|
|
uploaded_file = st.file_uploader("", type="pdf", label_visibility="collapsed") |
|
|
if uploaded_file: |
|
|
temp_path = os.path.join("/tmp", uploaded_file.name) |
|
|
with open(temp_path, "wb") as f: |
|
|
f.write(uploaded_file.getbuffer()) |
|
|
st.success("β
Document processed successfully β you can start asking your questions below.") |
|
|
else: |
|
|
temp_path = None |
|
|
|
|
|
if temp_path: |
|
|
with st.spinner("π Processing document..."): |
|
|
text, toc, toc_source = extract_text_from_pdf(temp_path) |
|
|
chunks = chunk_text(text, chunk_size=chunk_size, overlap=overlap) |
|
|
|
|
|
|
|
|
doc_sample = " ".join(chunks[:3])[:1000] |
|
|
doc_lang = detect_language(doc_sample) |
|
|
st.session_state["doc_lang"] = doc_lang |
|
|
st.caption(f"πΉ Detected document language: {'Hindi' if doc_lang == 'hi' else 'English'}") |
|
|
|
|
|
with st.spinner("βοΈ Building search index..."): |
|
|
embeddings = cache_embeddings(os.path.basename(temp_path), chunks, embed_chunks) |
|
|
index = build_faiss_index(embeddings) |
|
|
|
|
|
doc_name = os.path.basename(temp_path) |
|
|
if st.session_state["last_doc"] != doc_name: |
|
|
query_suggestions = generate_dynamic_suggestions_from_toc(toc, chunks, doc_name) |
|
|
st.session_state["query_suggestions_fixed"] = query_suggestions |
|
|
st.session_state["last_doc"] = doc_name |
|
|
|
|
|
|
|
|
st.session_state["user_query_input"] = "" |
|
|
st.session_state["selected_suggestion"] = None |
|
|
st.session_state["show_more"] = False |
|
|
st.experimental_rerun() |
|
|
else: |
|
|
query_suggestions = st.session_state["query_suggestions_fixed"] |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
st.markdown("### π¬ Ask the Assistant") |
|
|
|
|
|
if query_suggestions: |
|
|
visible = query_suggestions if st.session_state["show_more"] else query_suggestions[:3] |
|
|
cols = st.columns(min(3, len(visible))) |
|
|
for i, q in enumerate(visible): |
|
|
if cols[i % 3].button(f"π¬ {q}", key=f"sugg_{i}"): |
|
|
set_user_query(q, i) |
|
|
|
|
|
toggle_text = "Show less β²" if st.session_state["show_more"] else "Show more βΌ" |
|
|
if st.button(toggle_text, help="Show or hide more suggestions"): |
|
|
st.session_state["show_more"] = not st.session_state["show_more"] |
|
|
st.experimental_rerun() |
|
|
|
|
|
user_query = st.text_input("Type your question or click one above:", key="user_query_input") |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if user_query.strip(): |
|
|
reasoning_mode = mode == "Extended (Document + general)" |
|
|
with st.spinner("π Generating your answer..."): |
|
|
retrieved = retrieve_chunks(user_query, index, chunks, top_k=top_k, embeddings=embeddings) |
|
|
doc_lang = st.session_state.get("doc_lang", "en") |
|
|
answer = generate_answer(user_query, retrieved, reasoning_mode=reasoning_mode, doc_lang=doc_lang) |
|
|
|
|
|
st.markdown("### π€ Assistantβs Answer") |
|
|
|
|
|
|
|
|
if not reasoning_mode and not answer.startswith("β οΈ"): |
|
|
|
|
|
answer = re.sub(r"\*\*(.*?)\*\*", r"\1", answer) |
|
|
answer = re.sub(r"(^|\n)-\s*", r"\1<br>β’ ", answer) |
|
|
st.markdown(f"<div class='answer-box'>{answer}</div>", unsafe_allow_html=True) |
|
|
|
|
|
with st.expander("π Supporting Context"): |
|
|
for i, r in enumerate(retrieved, start=1): |
|
|
st.markdown(f"**Chunk {i}:** {r}") |
|
|
|
|
|
if toc: |
|
|
with st.expander("π Explore Document Sections"): |
|
|
toc_text = "\n".join([f"{sec}. {title}" for sec, title in toc]) |
|
|
st.text_area("", toc_text, height=140) |
|
|
|
|
|
with st.expander("π Document Preview"): |
|
|
st.text_area("", text[:1000], height=140) |
|
|
st.caption(f"{len(chunks)} chunks processed.") |
|
|
|