Shubham170793's picture
Update src/streamlit_app.py
c941464 verified
raw
history blame
8.73 kB
import os
import shutil
import streamlit as st
import torch
print("CUDA available:", torch.cuda.is_available())
print("Device count:", torch.cuda.device_count())
if torch.cuda.is_available():
print("GPU name:", torch.cuda.get_device_name(0))
else:
print("Running on CPU")
# ==========================================================
# βœ… Page Configuration (must be first Streamlit command)
# ==========================================================
st.set_page_config(
page_title="Enterprise Knowledge Assistant",
layout="wide"
)
# ==========================================================
# 🧹 Cache Management (prevents Hugging Face 50GB overflow)
# ==========================================================
def clean_cache(max_size_gb: float = 2.0):
"""
Cleans large cache folders (> max_size_gb), preserving /tmp/hf_cache if small.
"""
folders = [
"/root/.cache/huggingface",
"/root/.cache/transformers",
"/root/.cache/torch",
# "/tmp/hf_cache", # 🚫 DO NOT DELETE: used by Mistral for offloading
]
total_deleted = 0.0
for folder in folders:
if os.path.exists(folder):
# estimate folder size
size_gb = sum(
os.path.getsize(os.path.join(dp, f))
for dp, _, files in os.walk(folder)
for f in files
) / (1024**3)
# only delete if large
if size_gb > max_size_gb or "torch" in folder:
shutil.rmtree(folder, ignore_errors=True)
total_deleted += size_gb
print(f"πŸ—‘οΈ Deleted {folder} ({size_gb:.2f} GB)")
else:
print(f"βœ… Preserved {folder} ({size_gb:.2f} GB)")
os.makedirs("/tmp/hf_cache", exist_ok=True)
print(f"🧹 Cache cleanup done. ~{total_deleted:.2f} GB removed.")
def check_disk_usage():
"""Show disk usage info in sidebar."""
st.sidebar.markdown("### πŸ’Ύ Disk Usage (Debug)")
try:
usage = os.popen("du -sh /root/.cache /tmp 2>/dev/null").read()
st.sidebar.text(usage if usage else "No cache directories found.")
except Exception as e:
st.sidebar.text(f"⚠️ Disk usage check failed: {e}")
# Run cleanup & diagnostics
clean_cache()
check_disk_usage()
# ==========================================================
# βš™οΈ Hugging Face Cache Configuration (/tmp for writable path)
# ==========================================================
CACHE_DIR = "/tmp/hf_cache"
os.makedirs(CACHE_DIR, exist_ok=True)
os.environ.update({
"HF_HOME": CACHE_DIR,
"TRANSFORMERS_CACHE": CACHE_DIR,
"HF_DATASETS_CACHE": CACHE_DIR,
"HF_MODULES_CACHE": CACHE_DIR
})
# ==========================================================
# πŸ“¦ Imports AFTER environment setup
# ==========================================================
from ingestion import extract_text_from_pdf, chunk_text
from embeddings import generate_embeddings
from vectorstore import build_faiss_index
from qa import retrieve_chunks, generate_answer
# ==========================================================
# πŸ“ Paths
# ==========================================================
BASE_DIR = os.path.dirname(__file__) # /app/src
LOGO_PATH = os.path.join(BASE_DIR, "logo.png")
SAMPLE_PATH = os.path.join(BASE_DIR, "sample.pdf")
# ==========================================================
# πŸ–₯️ UI Header
# ==========================================================
st.title("πŸ“„ Enterprise Knowledge Assistant")
st.caption("Upload a PDF or use the sample file to explore intelligent document Q&A.")
# ==========================================================
# 🧭 Sidebar (Document Library + Settings + Diagnostics)
# ==========================================================
with st.sidebar:
# πŸ–ΌοΈ App Logo (if available)
if os.path.exists(LOGO_PATH):
st.image(LOGO_PATH, width=150)
# 🧠 Reasoning Mode Toggle (Persistent)
if "reasoning_mode" not in st.session_state:
st.session_state.reasoning_mode = False # Default OFF
st.session_state.reasoning_mode = st.toggle(
"🧠 Enable Reasoning Mode",
value=st.session_state.reasoning_mode,
help=(
"When ON, the assistant can use its world knowledge and reasoning ability "
"to generate richer, more explanatory answers.\n\n"
"When OFF, it sticks strictly to the document text for factual accuracy."
)
)
st.markdown("---")
# πŸ“š Document Library
st.header("πŸ“š Document Library")
doc_choice = st.radio(
"Choose a document:",
["-- Select --", "Sample PDF", "Upload Custom PDF"],
index=0
)
st.markdown("---")
# βš™οΈ Settings
st.header("βš™οΈ Settings")
chunk_size = st.slider("Chunk Size (characters)", 200, 800, 500, step=50)
overlap = st.slider("Chunk Overlap (characters)", 50, 200, 120, step=10)
top_k = st.slider("Top K Results (retrieved chunks)", 1, 10, 5)
st.markdown("---")
# πŸ‘¨β€πŸ’» Branding
st.caption("πŸ‘¨β€πŸ’» Built by Shubham Sharma")
st.markdown("[πŸ“‚ GitHub Repo](https://github.com/shubhamsharma170793-cpu/enterprise-knowledge-assistant)")
# ==========================================================
# 🧾 Document Handling
# ==========================================================
text, chunks, index = None, None, None
if doc_choice == "-- Select --":
st.info("⬅️ Please choose **Sample PDF** or **Upload Custom PDF** from the sidebar.")
elif doc_choice == "Sample PDF":
temp_path = SAMPLE_PATH
st.success("πŸ“˜ Using built-in Sample PDF")
with st.spinner("πŸ” Extracting and processing document..."):
text = extract_text_from_pdf(temp_path)
chunks = chunk_text(text, chunk_size=chunk_size)
embeddings = generate_embeddings(chunks)
index = build_faiss_index(embeddings)
elif doc_choice == "Upload Custom PDF":
uploaded_file = st.file_uploader("πŸ“‚ Upload your PDF", type="pdf")
if uploaded_file:
temp_path = os.path.join("/tmp", uploaded_file.name)
with open(temp_path, "wb") as f:
f.write(uploaded_file.getbuffer())
st.success(f"βœ… File '{uploaded_file.name}' uploaded successfully")
with st.spinner("βš™οΈ Extracting and processing your document..."):
text = extract_text_from_pdf(temp_path)
chunks = chunk_text(text, chunk_size=chunk_size)
embeddings = generate_embeddings(chunks)
index = build_faiss_index(embeddings)
st.success("πŸš€ Document processed successfully!")
# ==========================================================
# πŸ“‘ Document Preview
# ==========================================================
if chunks:
st.subheader("πŸ“‘ Document Preview")
st.text_area("Extracted text (first 1000 chars)", text[:1000], height=200)
avg_len = int(sum(len(c) for c in chunks) / len(chunks))
st.caption(f"πŸ“¦ {len(chunks)} chunks created | Avg chunk length: {avg_len} chars")
# ---------------------------
# Query Section
# ---------------------------
if index and chunks:
st.markdown("---")
st.subheader("πŸ€– Ask a Question")
user_query = st.text_input("πŸ” Your question about the document:")
if user_query:
# Show which mode is active
mode_label = (
"🧠 Reasoning Mode (expanded thinking)"
if st.session_state.reasoning_mode
else "πŸ“„ Strict Document Mode (factual only)"
)
st.caption(f"Mode: {mode_label}")
# Generate the answer
with st.spinner("🧠 Thinking... retrieving context and generating answer..."):
retrieved = retrieve_chunks(user_query, index, chunks, top_k=top_k)
answer = generate_answer(user_query, retrieved, reasoning_mode=st.session_state.reasoning_mode)
# βœ… Display Answer
st.markdown("### βœ… Assistant’s Answer")
st.markdown(
f"<div style='background-color:#0E1117;padding:12px;border-radius:10px;color:white;'>{answer}</div>",
unsafe_allow_html=True
)
# πŸ“„ Supporting Chunks
with st.expander("πŸ“„ Supporting Chunks (Context Used)"):
for i, r in enumerate(retrieved, start=1):
st.markdown(
f"""
<div style='background-color:#111827;padding:10px;border-radius:8px;margin-bottom:6px;'>
<b>Chunk {i}:</b><br>{r}
</div>
""",
unsafe_allow_html=True,
)
else:
st.info("πŸ“₯ Upload or select a document to start exploring.")