import os import shutil import streamlit as st # ========================================================== # βœ… Page Configuration (must be first Streamlit command) # ========================================================== st.set_page_config( page_title="Enterprise Knowledge Assistant", layout="wide" ) # ========================================================== # 🧹 Cache Management (prevents Hugging Face 50GB overflow) # ========================================================== def clean_cache(max_size_gb: float = 2.0): """ Cleans large cache folders (> max_size_gb), preserving /tmp/hf_cache if small. """ folders = [ "/root/.cache/huggingface", "/root/.cache/transformers", "/root/.cache/torch", "/tmp/hf_cache", ] total_deleted = 0.0 for folder in folders: if os.path.exists(folder): # estimate folder size size_gb = sum( os.path.getsize(os.path.join(dp, f)) for dp, _, files in os.walk(folder) for f in files ) / (1024**3) # only delete if large if size_gb > max_size_gb or "torch" in folder: shutil.rmtree(folder, ignore_errors=True) total_deleted += size_gb print(f"πŸ—‘οΈ Deleted {folder} ({size_gb:.2f} GB)") else: print(f"βœ… Preserved {folder} ({size_gb:.2f} GB)") os.makedirs("/tmp/hf_cache", exist_ok=True) print(f"🧹 Cache cleanup done. ~{total_deleted:.2f} GB removed.") def check_disk_usage(): """Show disk usage info in sidebar.""" st.sidebar.markdown("### πŸ’Ύ Disk Usage (Debug)") try: usage = os.popen("du -sh /root/.cache /tmp 2>/dev/null").read() st.sidebar.text(usage if usage else "No cache directories found.") except Exception as e: st.sidebar.text(f"⚠️ Disk usage check failed: {e}") # Run cleanup & diagnostics clean_cache() check_disk_usage() # ========================================================== # βš™οΈ Hugging Face Cache Configuration (/tmp for writable path) # ========================================================== CACHE_DIR = "/tmp/hf_cache" os.makedirs(CACHE_DIR, exist_ok=True) os.environ.update({ "HF_HOME": CACHE_DIR, "TRANSFORMERS_CACHE": CACHE_DIR, "HF_DATASETS_CACHE": CACHE_DIR, "HF_MODULES_CACHE": CACHE_DIR }) # ========================================================== # πŸ“¦ Imports AFTER environment setup # ========================================================== from ingestion import extract_text_from_pdf, chunk_text from embeddings import generate_embeddings from vectorstore import build_faiss_index from qa import retrieve_chunks, generate_answer # ========================================================== # πŸ“ Paths # ========================================================== BASE_DIR = os.path.dirname(__file__) # /app/src LOGO_PATH = os.path.join(BASE_DIR, "logo.png") SAMPLE_PATH = os.path.join(BASE_DIR, "sample.pdf") # ========================================================== # πŸ–₯️ UI Header # ========================================================== st.title("πŸ“„ Enterprise Knowledge Assistant") st.caption("Upload a PDF or use the sample file to explore intelligent document Q&A.") # ========================================================== # 🧭 Sidebar (Document Library + Settings + Diagnostics) # ========================================================== with st.sidebar: if os.path.exists(LOGO_PATH): st.image(LOGO_PATH, width=150) st.header("πŸ“š Document Library") doc_choice = st.radio( "Choose a document:", ["-- Select --", "Sample PDF", "Upload Custom PDF"], index=0 ) st.markdown("---") st.header("βš™οΈ Settings") chunk_size = st.slider("Chunk Size (characters)", 300, 1200, 800, step=100) top_k = st.slider("Top K Results (retrieved chunks)", 1, 10, 5) st.markdown("---") st.caption("πŸ‘¨β€πŸ’» Built by Shubham Sharma") st.markdown("[πŸ“‚ GitHub Repo](https://github.com/shubhamsharma170793-cpu/enterprise-knowledge-assistant)") # ========================================================== # 🧾 Document Handling # ========================================================== text, chunks, index = None, None, None if doc_choice == "-- Select --": st.info("⬅️ Please choose **Sample PDF** or **Upload Custom PDF** from the sidebar.") elif doc_choice == "Sample PDF": temp_path = SAMPLE_PATH st.success("πŸ“˜ Using built-in Sample PDF") with st.spinner("πŸ” Extracting and processing document..."): text = extract_text_from_pdf(temp_path) chunks = chunk_text(text, chunk_size=chunk_size) embeddings = generate_embeddings(chunks) index = build_faiss_index(embeddings) elif doc_choice == "Upload Custom PDF": uploaded_file = st.file_uploader("πŸ“‚ Upload your PDF", type="pdf") if uploaded_file: temp_path = os.path.join("/tmp", uploaded_file.name) with open(temp_path, "wb") as f: f.write(uploaded_file.getbuffer()) st.success(f"βœ… File '{uploaded_file.name}' uploaded successfully") with st.spinner("βš™οΈ Extracting and processing your document..."): text = extract_text_from_pdf(temp_path) chunks = chunk_text(text, chunk_size=chunk_size) embeddings = generate_embeddings(chunks) index = build_faiss_index(embeddings) st.success("πŸš€ Document processed successfully!") # ========================================================== # πŸ“‘ Document Preview # ========================================================== if chunks: st.subheader("πŸ“‘ Document Preview") st.text_area("Extracted text (first 1000 chars)", text[:1000], height=200) avg_len = int(sum(len(c) for c in chunks) / len(chunks)) st.caption(f"πŸ“¦ {len(chunks)} chunks created | Avg chunk length: {avg_len} chars") # ========================================================== # πŸ’¬ Query Section # ========================================================== if index and chunks: st.markdown("---") st.subheader("πŸ€– Ask a Question") user_query = st.text_input("πŸ” Your question about the document:") if user_query: with st.spinner("🧠 Thinking... retrieving context and generating answer..."): retrieved = retrieve_chunks(user_query, index, chunks, top_k=top_k) answer = generate_answer(user_query, retrieved) # βœ… Answer Display st.markdown("### βœ… Assistant’s Answer") st.markdown( f"
{answer}
", unsafe_allow_html=True ) # πŸ“„ Supporting Chunks with st.expander("πŸ“„ Supporting Chunks (Context Used)"): for i, r in enumerate(retrieved, start=1): st.markdown( f"""
Chunk {i}:
{r}
""", unsafe_allow_html=True, ) else: st.info("πŸ“₯ Upload or select a document to start exploring.")