|
|
import os |
|
|
import re |
|
|
import shutil |
|
|
import hashlib |
|
|
import streamlit as st |
|
|
import torch |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
print("CUDA available:", torch.cuda.is_available()) |
|
|
print("Device count:", torch.cuda.device_count()) |
|
|
if torch.cuda.is_available(): |
|
|
print("GPU name:", torch.cuda.get_device_name(0)) |
|
|
else: |
|
|
print("Running on CPU") |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
st.set_page_config( |
|
|
page_title="Enterprise Knowledge Assistant", |
|
|
layout="wide" |
|
|
) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def clean_cache(max_size_gb: float = 2.0): |
|
|
folders = [ |
|
|
"/root/.cache/huggingface", |
|
|
"/root/.cache/transformers", |
|
|
"/root/.cache/torch", |
|
|
] |
|
|
total_deleted = 0.0 |
|
|
for folder in folders: |
|
|
if os.path.exists(folder): |
|
|
size_gb = sum( |
|
|
os.path.getsize(os.path.join(dp, f)) |
|
|
for dp, _, files in os.walk(folder) |
|
|
for f in files |
|
|
) / (1024**3) |
|
|
if size_gb > max_size_gb or "torch" in folder: |
|
|
shutil.rmtree(folder, ignore_errors=True) |
|
|
total_deleted += size_gb |
|
|
os.makedirs("/tmp/hf_cache", exist_ok=True) |
|
|
print(f"π§Ή Cache cleanup done. ~{total_deleted:.2f} GB removed.") |
|
|
|
|
|
def check_disk_usage(): |
|
|
st.sidebar.markdown("### πΎ Disk Usage (Debug)") |
|
|
try: |
|
|
usage = os.popen("du -sh /root/.cache /tmp 2>/dev/null").read() |
|
|
st.sidebar.text(usage if usage else "No cache directories found.") |
|
|
except Exception as e: |
|
|
st.sidebar.text(f"β οΈ Disk usage check failed: {e}") |
|
|
|
|
|
clean_cache() |
|
|
check_disk_usage() |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
CACHE_DIR = "/tmp/hf_cache" |
|
|
os.makedirs(CACHE_DIR, exist_ok=True) |
|
|
os.environ.update({ |
|
|
"HF_HOME": CACHE_DIR, |
|
|
"TRANSFORMERS_CACHE": CACHE_DIR, |
|
|
"HF_DATASETS_CACHE": CACHE_DIR, |
|
|
"HF_MODULES_CACHE": CACHE_DIR |
|
|
}) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
from ingestion import extract_text_from_pdf, chunk_text |
|
|
from vectorstore import build_faiss_index |
|
|
from qa import retrieve_chunks, generate_answer, cache_embeddings, embed_chunks, genai_generate |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def clean_toc_titles(toc): |
|
|
clean_titles = [] |
|
|
for _, title in toc: |
|
|
title = re.sub(r"^\d+(\.\d+)*\s*", "", title) |
|
|
title = title.strip() |
|
|
if len(title) > 3: |
|
|
clean_titles.append(title) |
|
|
return clean_titles |
|
|
|
|
|
|
|
|
def generate_query_suggestions(toc_titles): |
|
|
suggestions = [] |
|
|
for t in toc_titles: |
|
|
lower = t.lower() |
|
|
if "prerequisite" in lower: |
|
|
suggestions.append("What are the prerequisites for setting this up?") |
|
|
elif "restriction" in lower: |
|
|
suggestions.append("What are the key restrictions or limitations?") |
|
|
elif "configuration" in lower or "setup" in lower: |
|
|
suggestions.append(f"How do I {t.lower()}?") |
|
|
elif "overview" in lower or "introduction" in lower: |
|
|
suggestions.append("Can you give me an overview of this document?") |
|
|
elif "purpose" in lower: |
|
|
suggestions.append("What is the purpose of this guide?") |
|
|
elif "example" in lower: |
|
|
suggestions.append("Can you show an example from this document?") |
|
|
elif "process" in lower: |
|
|
suggestions.append(f"Can you explain the {t.lower()} process?") |
|
|
else: |
|
|
suggestions.append(f"Explain the section about {t.lower()}.") |
|
|
seen, final = set(), [] |
|
|
for s in suggestions: |
|
|
if s not in seen: |
|
|
seen.add(s) |
|
|
final.append(s) |
|
|
return final[:6] |
|
|
|
|
|
|
|
|
def generate_ai_dynamic_suggestions(chunks, doc_name="Document"): |
|
|
""" |
|
|
π€ Uses GPT-4o via SAP GenAI Hub to analyze first few chunks |
|
|
and generate dynamic, context-aware question suggestions. |
|
|
""" |
|
|
if not chunks: |
|
|
return [] |
|
|
|
|
|
|
|
|
sample_text = " ".join(chunks[:3])[:3000] |
|
|
prompt = f""" |
|
|
You are an intelligent assistant helping users explore enterprise documentation titled '{doc_name}'. |
|
|
|
|
|
Based on the content below, generate 5 short, interactive, human-like questions |
|
|
that a curious user might ask to understand this document better. |
|
|
Avoid section numbers, and sound conversational. |
|
|
|
|
|
--- |
|
|
Content Sample: |
|
|
{sample_text} |
|
|
--- |
|
|
Questions: |
|
|
""" |
|
|
|
|
|
try: |
|
|
ai_response = genai_generate(prompt) |
|
|
questions = re.findall(r"[-β’]?\s*(.+)", ai_response) |
|
|
clean_q = [q.strip("β’-β ").strip() for q in questions if 8 < len(q) < 120] |
|
|
clean_q = [q for q in clean_q if q.endswith("?")] |
|
|
return clean_q[:6] if clean_q else [ |
|
|
"What is this document about?", |
|
|
"How do I start using the process described here?", |
|
|
"What key setup steps are involved?", |
|
|
"What benefits or objectives are explained?", |
|
|
] |
|
|
except Exception as e: |
|
|
print(f"β οΈ AI suggestion generation failed: {e}") |
|
|
return [ |
|
|
"Can you summarize the document?", |
|
|
"What is the main idea here?", |
|
|
"How does this guide help me?", |
|
|
] |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
BASE_DIR = os.path.dirname(__file__) |
|
|
LOGO_PATH = os.path.join(BASE_DIR, "logo.png") |
|
|
SAMPLE_PATH = os.path.join(BASE_DIR, "sample.pdf") |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
st.title("π Enterprise Knowledge Assistant") |
|
|
st.caption("Query SAP documentation and enterprise PDFs using natural language and reasoning.") |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
with st.sidebar: |
|
|
if os.path.exists(LOGO_PATH): |
|
|
st.image(LOGO_PATH, width=150) |
|
|
|
|
|
if "reasoning_mode" not in st.session_state: |
|
|
st.session_state.reasoning_mode = False |
|
|
|
|
|
st.session_state.reasoning_mode = st.toggle( |
|
|
"π§ Enable Reasoning Mode", |
|
|
value=st.session_state.reasoning_mode, |
|
|
help="When ON: GPT-4o uses reasoning + synthesis.\nWhen OFF: strictly factual." |
|
|
) |
|
|
|
|
|
st.markdown("---") |
|
|
st.header("π Document Library") |
|
|
doc_choice = st.radio("Choose a document:", ["-- Select --", "Sample PDF", "Upload Custom PDF"], index=0) |
|
|
|
|
|
st.markdown("---") |
|
|
st.header("βοΈ Settings") |
|
|
chunk_size = st.slider("Chunk Size", 200, 1500, 800, step=50) |
|
|
overlap = st.slider("Chunk Overlap", 50, 200, 120, step=10) |
|
|
top_k = st.slider("Top K Results", 1, 10, 5) |
|
|
st.markdown("---") |
|
|
st.caption("π¨βπ» Built by Shubham Sharma") |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
text, chunks, index, embeddings, toc = None, None, None, None, None |
|
|
|
|
|
if doc_choice == "-- Select --": |
|
|
st.info("β¬
οΈ Please choose a document from the sidebar.") |
|
|
|
|
|
elif doc_choice in ["Sample PDF", "Upload Custom PDF"]: |
|
|
temp_path = SAMPLE_PATH if doc_choice == "Sample PDF" else None |
|
|
if doc_choice == "Upload Custom PDF": |
|
|
uploaded_file = st.file_uploader("π Upload your PDF", type="pdf") |
|
|
if uploaded_file: |
|
|
temp_path = os.path.join("/tmp", uploaded_file.name) |
|
|
with open(temp_path, "wb") as f: |
|
|
f.write(uploaded_file.getbuffer()) |
|
|
st.success(f"β
File '{uploaded_file.name}' uploaded successfully") |
|
|
|
|
|
if temp_path: |
|
|
with st.spinner("π Extracting and processing document..."): |
|
|
text, toc = extract_text_from_pdf(temp_path) |
|
|
chunks = chunk_text(text, chunk_size=chunk_size) |
|
|
st.write(f"π Extracted {len(chunks)} chunks.") |
|
|
|
|
|
if toc: |
|
|
st.markdown("### π§ Detected Table of Contents") |
|
|
toc_text = "\n".join([f"{sec}. {title}" for sec, title in toc]) |
|
|
st.text_area("TOC Preview", toc_text, height=200) |
|
|
|
|
|
clean_titles = clean_toc_titles(toc) |
|
|
query_suggestions = generate_query_suggestions(clean_titles) |
|
|
else: |
|
|
st.warning("β οΈ No TOC detected β generating dynamic suggestions using AI...") |
|
|
query_suggestions = generate_ai_dynamic_suggestions(chunks, doc_name=os.path.basename(temp_path)) |
|
|
|
|
|
if query_suggestions: |
|
|
st.markdown("#### π‘ Suggested Questions") |
|
|
cols = st.columns(2) |
|
|
for i, q in enumerate(query_suggestions): |
|
|
if cols[i % 2].button(f"π {q}"): |
|
|
st.session_state["user_query"] = q |
|
|
|
|
|
with st.spinner("βοΈ Loading cached embeddings or generating new ones..."): |
|
|
embeddings = cache_embeddings(os.path.basename(temp_path), chunks, embed_chunks) |
|
|
index = build_faiss_index(embeddings) |
|
|
st.success("π Document processed successfully!") |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if index and chunks: |
|
|
st.markdown("---") |
|
|
st.subheader("π€ Ask a Question") |
|
|
|
|
|
user_query = st.text_input( |
|
|
"π Your question about the document:", |
|
|
value=st.session_state.get("user_query", "") |
|
|
) |
|
|
|
|
|
if user_query: |
|
|
mode_label = ( |
|
|
"π§ Reasoning Mode (expanded thinking)" |
|
|
if st.session_state.reasoning_mode |
|
|
else "π Strict Document Mode (factual only)" |
|
|
) |
|
|
st.caption(f"Mode: {mode_label}") |
|
|
|
|
|
with st.spinner("π§ Thinking... retrieving context and generating answer..."): |
|
|
retrieved = retrieve_chunks(user_query, index, chunks, top_k=top_k, embeddings=embeddings) |
|
|
answer = generate_answer(user_query, retrieved, reasoning_mode=st.session_state.reasoning_mode) |
|
|
|
|
|
st.markdown("### β
Assistantβs Answer") |
|
|
st.markdown( |
|
|
f"<div style='background-color:#0E1117;padding:12px;border-radius:10px;color:white;'>{answer}</div>", |
|
|
unsafe_allow_html=True |
|
|
) |
|
|
|
|
|
with st.expander("π Supporting Chunks (Context Used)"): |
|
|
for i, r in enumerate(retrieved, start=1): |
|
|
st.markdown( |
|
|
f""" |
|
|
<div style='background-color:#111827;padding:10px;border-radius:8px;margin-bottom:6px;'> |
|
|
<b>Chunk {i}:</b><br>{r} |
|
|
</div> |
|
|
""", |
|
|
unsafe_allow_html=True, |
|
|
) |
|
|
else: |
|
|
st.info("π₯ Upload or select a document to start exploring.") |
|
|
|