|
|
import os |
|
|
import pdfplumber |
|
|
import docx |
|
|
import pandas as pd |
|
|
import numpy as np |
|
|
import streamlit as st |
|
|
from sentence_transformers import SentenceTransformer |
|
|
from transformers import AutoTokenizer |
|
|
import faiss |
|
|
from groq import Groq |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
os.environ["GROQ_API_KEY"] = os.getenv("API") |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
st.set_page_config(page_title="Universal RAG App", layout="wide") |
|
|
st.title("π Universal Document RAG (PDF | Word | Excel)") |
|
|
|
|
|
uploaded_file = st.file_uploader( |
|
|
"Upload a document", |
|
|
type=["pdf", "docx", "xlsx"] |
|
|
) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def read_pdf_with_plumber(pdf_path): |
|
|
pages = [] |
|
|
with pdfplumber.open(pdf_path) as pdf: |
|
|
for i, page in enumerate(pdf.pages): |
|
|
text = page.extract_text(x_tolerance=2) |
|
|
if text: |
|
|
pages.append({"page": i + 1, "text": text}) |
|
|
return pages |
|
|
|
|
|
def read_word(doc_path): |
|
|
doc = docx.Document(doc_path) |
|
|
text = "\n\n".join([p.text for p in doc.paragraphs if p.text.strip() != ""]) |
|
|
return [{"page": 1, "text": text}] |
|
|
|
|
|
def read_excel(xlsx_path): |
|
|
df = pd.read_excel(xlsx_path, sheet_name=None) |
|
|
texts = [] |
|
|
for sheet_name, sheet in df.items(): |
|
|
sheet_text = sheet.fillna("").astype(str).agg(" ".join, axis=1).str.cat(sep="\n") |
|
|
texts.append({"page": sheet_name, "text": sheet_text}) |
|
|
return texts |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def chunk_text(pages, chunk_size=800): |
|
|
chunks = [] |
|
|
for page in pages: |
|
|
paragraphs = page["text"].split("\n\n") |
|
|
buffer = "" |
|
|
for para in paragraphs: |
|
|
if len(buffer) + len(para) <= chunk_size: |
|
|
buffer += " " + para |
|
|
else: |
|
|
chunks.append({"page": page["page"], "text": buffer.strip()}) |
|
|
buffer = para |
|
|
if buffer: |
|
|
chunks.append({"page": page["page"], "text": buffer.strip()}) |
|
|
return chunks |
|
|
|
|
|
def tokenize_chunks(chunks, model_name="sentence-transformers/all-mpnet-base-v2"): |
|
|
tokenizer = AutoTokenizer.from_pretrained(model_name) |
|
|
return [tokenizer(c["text"], truncation=True)["input_ids"] for c in chunks] |
|
|
|
|
|
def create_embeddings(chunks, model_name="allenai/specter"): |
|
|
embedder = SentenceTransformer(model_name) |
|
|
texts = [c["text"] for c in chunks] |
|
|
embeddings = embedder.encode(texts, show_progress_bar=False) |
|
|
return embedder, np.array(embeddings) |
|
|
|
|
|
def store_embeddings(embeddings): |
|
|
faiss.normalize_L2(embeddings) |
|
|
dim = embeddings.shape[1] |
|
|
index = faiss.IndexFlatIP(dim) |
|
|
index.add(embeddings) |
|
|
return index |
|
|
|
|
|
def retrieve_chunks(query, embedder, index, chunks, top_k=None): |
|
|
if not top_k: |
|
|
top_k = min(20, len(chunks)) |
|
|
query_vec = embedder.encode([query]) |
|
|
faiss.normalize_L2(query_vec) |
|
|
scores, indices = index.search(query_vec, top_k) |
|
|
return [chunks[i] for i in indices[0]] |
|
|
|
|
|
def build_safe_context(retrieved_chunks, max_chars=12000): |
|
|
context = "" |
|
|
used = 0 |
|
|
for c in retrieved_chunks[:3]: |
|
|
block = f"(Page {c['page']}) {c['text']}\n\n" |
|
|
context += block |
|
|
used += len(block) |
|
|
for c in retrieved_chunks[3:]: |
|
|
block = f"(Page {c['page']}) {c['text']}\n\n" |
|
|
if used + len(block) > max_chars: |
|
|
break |
|
|
context += block |
|
|
used += len(block) |
|
|
return context |
|
|
|
|
|
def generate_answer(query, context): |
|
|
client = Groq() |
|
|
prompt = f""" |
|
|
You are a document-based assistant. |
|
|
Use the context to answer the question clearly. |
|
|
If the answer is partially available, summarize it. |
|
|
If the answer is not present, you may say 'Not found in the document'. |
|
|
|
|
|
Context: |
|
|
{context} |
|
|
|
|
|
Question: |
|
|
{query} |
|
|
""" |
|
|
response = client.chat.completions.create( |
|
|
model="llama-3.1-8b-instant", |
|
|
messages=[{"role": "user", "content": prompt}], |
|
|
temperature=0.3 |
|
|
) |
|
|
return response.choices[0].message.content |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if uploaded_file: |
|
|
with st.spinner("π Reading document..."): |
|
|
file_name = uploaded_file.name |
|
|
|
|
|
with open(file_name, "wb") as f: |
|
|
f.write(uploaded_file.getbuffer()) |
|
|
|
|
|
if file_name.lower().endswith(".pdf"): |
|
|
pages = read_pdf_with_plumber(file_name) |
|
|
elif file_name.lower().endswith(".docx"): |
|
|
pages = read_word(file_name) |
|
|
elif file_name.lower().endswith(".xlsx"): |
|
|
pages = read_excel(file_name) |
|
|
else: |
|
|
st.error("Unsupported file type") |
|
|
|
|
|
with st.spinner("βοΈ Chunking & embedding document..."): |
|
|
chunks = chunk_text(pages) |
|
|
tokenize_chunks(chunks) |
|
|
embedder, embeddings = create_embeddings(chunks) |
|
|
index = store_embeddings(embeddings) |
|
|
|
|
|
st.success("β
Document indexed successfully") |
|
|
|
|
|
query = st.text_input("β Ask a question") |
|
|
|
|
|
if query: |
|
|
with st.spinner("π€ Generating answer..."): |
|
|
retrieved_chunks = retrieve_chunks(query, embedder, index, chunks) |
|
|
context = build_safe_context(retrieved_chunks) |
|
|
answer = generate_answer(query, context) |
|
|
|
|
|
st.markdown("### β
Answer") |
|
|
st.write(answer) |
|
|
|