|
|
import os |
|
|
import shutil |
|
|
import streamlit as st |
|
|
|
|
|
import torch |
|
|
print("CUDA available:", torch.cuda.is_available()) |
|
|
print("Device count:", torch.cuda.device_count()) |
|
|
if torch.cuda.is_available(): |
|
|
print("GPU name:", torch.cuda.get_device_name(0)) |
|
|
else: |
|
|
print("Running on CPU") |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
st.set_page_config( |
|
|
page_title="Enterprise Knowledge Assistant", |
|
|
layout="wide" |
|
|
) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def clean_cache(max_size_gb: float = 2.0): |
|
|
""" |
|
|
Cleans large cache folders (> max_size_gb), preserving /tmp/hf_cache if small. |
|
|
""" |
|
|
folders = [ |
|
|
"/root/.cache/huggingface", |
|
|
"/root/.cache/transformers", |
|
|
"/root/.cache/torch", |
|
|
|
|
|
] |
|
|
total_deleted = 0.0 |
|
|
|
|
|
for folder in folders: |
|
|
if os.path.exists(folder): |
|
|
|
|
|
size_gb = sum( |
|
|
os.path.getsize(os.path.join(dp, f)) |
|
|
for dp, _, files in os.walk(folder) |
|
|
for f in files |
|
|
) / (1024**3) |
|
|
|
|
|
|
|
|
if size_gb > max_size_gb or "torch" in folder: |
|
|
shutil.rmtree(folder, ignore_errors=True) |
|
|
total_deleted += size_gb |
|
|
print(f"ποΈ Deleted {folder} ({size_gb:.2f} GB)") |
|
|
else: |
|
|
print(f"β
Preserved {folder} ({size_gb:.2f} GB)") |
|
|
|
|
|
os.makedirs("/tmp/hf_cache", exist_ok=True) |
|
|
print(f"π§Ή Cache cleanup done. ~{total_deleted:.2f} GB removed.") |
|
|
|
|
|
|
|
|
def check_disk_usage(): |
|
|
"""Show disk usage info in sidebar.""" |
|
|
st.sidebar.markdown("### πΎ Disk Usage (Debug)") |
|
|
try: |
|
|
usage = os.popen("du -sh /root/.cache /tmp 2>/dev/null").read() |
|
|
st.sidebar.text(usage if usage else "No cache directories found.") |
|
|
except Exception as e: |
|
|
st.sidebar.text(f"β οΈ Disk usage check failed: {e}") |
|
|
|
|
|
|
|
|
|
|
|
clean_cache() |
|
|
check_disk_usage() |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
CACHE_DIR = "/tmp/hf_cache" |
|
|
os.makedirs(CACHE_DIR, exist_ok=True) |
|
|
os.environ.update({ |
|
|
"HF_HOME": CACHE_DIR, |
|
|
"TRANSFORMERS_CACHE": CACHE_DIR, |
|
|
"HF_DATASETS_CACHE": CACHE_DIR, |
|
|
"HF_MODULES_CACHE": CACHE_DIR |
|
|
}) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
from ingestion import extract_text_from_pdf, chunk_text |
|
|
from embeddings import generate_embeddings |
|
|
from vectorstore import build_faiss_index |
|
|
from qa import retrieve_chunks, generate_answer |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
BASE_DIR = os.path.dirname(__file__) |
|
|
LOGO_PATH = os.path.join(BASE_DIR, "logo.png") |
|
|
SAMPLE_PATH = os.path.join(BASE_DIR, "sample.pdf") |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
st.title("π Enterprise Knowledge Assistant") |
|
|
st.caption("Upload a PDF or use the sample file to explore intelligent document Q&A.") |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
with st.sidebar: |
|
|
|
|
|
if os.path.exists(LOGO_PATH): |
|
|
st.image(LOGO_PATH, width=150) |
|
|
|
|
|
|
|
|
if "reasoning_mode" not in st.session_state: |
|
|
st.session_state.reasoning_mode = False |
|
|
|
|
|
st.session_state.reasoning_mode = st.toggle( |
|
|
"π§ Enable Reasoning Mode", |
|
|
value=st.session_state.reasoning_mode, |
|
|
help=( |
|
|
"When ON, the assistant can use its world knowledge and reasoning ability " |
|
|
"to generate richer, more explanatory answers.\n\n" |
|
|
"When OFF, it sticks strictly to the document text for factual accuracy." |
|
|
) |
|
|
) |
|
|
|
|
|
st.markdown("---") |
|
|
|
|
|
|
|
|
st.header("π Document Library") |
|
|
doc_choice = st.radio( |
|
|
"Choose a document:", |
|
|
["-- Select --", "Sample PDF", "Upload Custom PDF"], |
|
|
index=0 |
|
|
) |
|
|
|
|
|
st.markdown("---") |
|
|
|
|
|
|
|
|
st.header("βοΈ Settings") |
|
|
chunk_size = st.slider("Chunk Size (characters)", 200, 800, 500, step=50) |
|
|
overlap = st.slider("Chunk Overlap (characters)", 50, 200, 120, step=10) |
|
|
top_k = st.slider("Top K Results (retrieved chunks)", 1, 10, 5) |
|
|
|
|
|
st.markdown("---") |
|
|
|
|
|
|
|
|
st.caption("π¨βπ» Built by Shubham Sharma") |
|
|
st.markdown("[π GitHub Repo](https://github.com/shubhamsharma170793-cpu/enterprise-knowledge-assistant)") |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
text, chunks, index = None, None, None |
|
|
|
|
|
if doc_choice == "-- Select --": |
|
|
st.info("β¬
οΈ Please choose **Sample PDF** or **Upload Custom PDF** from the sidebar.") |
|
|
|
|
|
elif doc_choice == "Sample PDF": |
|
|
temp_path = SAMPLE_PATH |
|
|
st.success("π Using built-in Sample PDF") |
|
|
with st.spinner("π Extracting and processing document..."): |
|
|
text = extract_text_from_pdf(temp_path) |
|
|
chunks = chunk_text(text, chunk_size=chunk_size) |
|
|
embeddings = generate_embeddings(chunks) |
|
|
index = build_faiss_index(embeddings) |
|
|
|
|
|
elif doc_choice == "Upload Custom PDF": |
|
|
uploaded_file = st.file_uploader("π Upload your PDF", type="pdf") |
|
|
if uploaded_file: |
|
|
temp_path = os.path.join("/tmp", uploaded_file.name) |
|
|
with open(temp_path, "wb") as f: |
|
|
f.write(uploaded_file.getbuffer()) |
|
|
st.success(f"β
File '{uploaded_file.name}' uploaded successfully") |
|
|
|
|
|
with st.spinner("βοΈ Extracting and processing your document..."): |
|
|
text = extract_text_from_pdf(temp_path) |
|
|
chunks = chunk_text(text, chunk_size=chunk_size) |
|
|
embeddings = generate_embeddings(chunks) |
|
|
index = build_faiss_index(embeddings) |
|
|
st.success("π Document processed successfully!") |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if chunks: |
|
|
st.subheader("π Document Preview") |
|
|
st.text_area("Extracted text (first 1000 chars)", text[:1000], height=200) |
|
|
avg_len = int(sum(len(c) for c in chunks) / len(chunks)) |
|
|
st.caption(f"π¦ {len(chunks)} chunks created | Avg chunk length: {avg_len} chars") |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if index and chunks: |
|
|
st.markdown("---") |
|
|
st.subheader("π€ Ask a Question") |
|
|
|
|
|
user_query = st.text_input("π Your question about the document:") |
|
|
|
|
|
if user_query: |
|
|
|
|
|
mode_label = ( |
|
|
"π§ Reasoning Mode (expanded thinking)" |
|
|
if st.session_state.reasoning_mode |
|
|
else "π Strict Document Mode (factual only)" |
|
|
) |
|
|
st.caption(f"Mode: {mode_label}") |
|
|
|
|
|
|
|
|
with st.spinner("π§ Thinking... retrieving context and generating answer..."): |
|
|
retrieved = retrieve_chunks(user_query, index, chunks, top_k=top_k) |
|
|
answer = generate_answer(user_query, retrieved, reasoning_mode=st.session_state.reasoning_mode) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
st.markdown("### β
Assistantβs Answer") |
|
|
st.markdown( |
|
|
f"<div style='background-color:#0E1117;padding:12px;border-radius:10px;color:white;'>{answer}</div>", |
|
|
unsafe_allow_html=True |
|
|
) |
|
|
|
|
|
|
|
|
with st.expander("π Supporting Chunks (Context Used)"): |
|
|
for i, r in enumerate(retrieved, start=1): |
|
|
st.markdown( |
|
|
f""" |
|
|
<div style='background-color:#111827;padding:10px;border-radius:8px;margin-bottom:6px;'> |
|
|
<b>Chunk {i}:</b><br>{r} |
|
|
</div> |
|
|
""", |
|
|
unsafe_allow_html=True, |
|
|
) |
|
|
|
|
|
else: |
|
|
st.info("π₯ Upload or select a document to start exploring.") |
|
|
|
|
|
|