Spaces:
Sleeping
Sleeping
| import os | |
| import docx | |
| import pandas as pd | |
| import numpy as np | |
| import streamlit as st | |
| from sentence_transformers import SentenceTransformer | |
| from transformers import AutoTokenizer | |
| import faiss | |
| from groq import Groq | |
| # ========================================================== | |
| # GROQ API KEY (use HF Secrets) | |
| # ========================================================== | |
| os.environ["GROQ_API_KEY"] = os.getenv("API") | |
| # ========================================================== | |
| # STREAMLIT UI | |
| # ========================================================== | |
| st.set_page_config(page_title="Word RAG App", layout="wide") | |
| st.title("π Word Document RAG") | |
| uploaded_file = st.file_uploader( | |
| "Upload a Word document (.docx)", | |
| type=["docx"] | |
| ) | |
| # ========================================================== | |
| # WORD TEXT EXTRACTION (UNCHANGED) | |
| # ========================================================== | |
| def read_word(doc_path): | |
| doc = docx.Document(doc_path) | |
| text = "\n\n".join([p.text for p in doc.paragraphs if p.text.strip() != ""]) | |
| return [{"page": 1, "text": text}] | |
| # ========================================================== | |
| # CORE RAG FUNCTIONS (UNCHANGED) | |
| # ========================================================== | |
| def chunk_text(pages, chunk_size=800): | |
| chunks = [] | |
| for page in pages: | |
| paragraphs = page["text"].split("\n\n") | |
| buffer = "" | |
| for para in paragraphs: | |
| if len(buffer) + len(para) <= chunk_size: | |
| buffer += " " + para | |
| else: | |
| chunks.append({"page": page["page"], "text": buffer.strip()}) | |
| buffer = para | |
| if buffer: | |
| chunks.append({"page": page["page"], "text": buffer.strip()}) | |
| return chunks | |
| def tokenize_chunks(chunks, model_name="sentence-transformers/all-mpnet-base-v2"): | |
| tokenizer = AutoTokenizer.from_pretrained(model_name) | |
| return [tokenizer(c["text"], truncation=True)["input_ids"] for c in chunks] | |
| def create_embeddings(chunks, model_name="allenai/specter"): | |
| embedder = SentenceTransformer(model_name) | |
| texts = [c["text"] for c in chunks] | |
| embeddings = embedder.encode(texts, show_progress_bar=False) | |
| return embedder, np.array(embeddings) | |
| def store_embeddings(embeddings): | |
| faiss.normalize_L2(embeddings) | |
| dim = embeddings.shape[1] | |
| index = faiss.IndexFlatIP(dim) | |
| index.add(embeddings) | |
| return index | |
| def retrieve_chunks(query, embedder, index, chunks, top_k=None): | |
| if not top_k: | |
| top_k = min(20, len(chunks)) | |
| query_vec = embedder.encode([query]) | |
| faiss.normalize_L2(query_vec) | |
| scores, indices = index.search(query_vec, top_k) | |
| return [chunks[i] for i in indices[0]] | |
| def build_safe_context(retrieved_chunks, max_chars=12000): | |
| context = "" | |
| used = 0 | |
| for c in retrieved_chunks[:3]: | |
| block = f"(Page {c['page']}) {c['text']}\n\n" | |
| context += block | |
| used += len(block) | |
| for c in retrieved_chunks[3:]: | |
| block = f"(Page {c['page']}) {c['text']}\n\n" | |
| if used + len(block) > max_chars: | |
| break | |
| context += block | |
| used += len(block) | |
| return context | |
| def generate_answer(query, context): | |
| client = Groq() | |
| prompt = f""" | |
| You are a document-based assistant. | |
| Use the context to answer the question clearly. | |
| If the answer is partially available, summarize it. | |
| If the answer is not present, say 'Not found in the document'. | |
| Context: | |
| {context} | |
| Question: | |
| {query} | |
| """ | |
| response = client.chat.completions.create( | |
| model="llama-3.1-8b-instant", | |
| messages=[{"role": "user", "content": prompt}], | |
| temperature=0.3 | |
| ) | |
| return response.choices[0].message.content | |
| # ========================================================== | |
| # APP LOGIC | |
| # ========================================================== | |
| if uploaded_file: | |
| with st.spinner("π Reading document..."): | |
| temp_path = "/tmp/uploaded.docx" | |
| with open(temp_path, "wb") as f: | |
| f.write(uploaded_file.getbuffer()) | |
| pages = read_word(temp_path) | |
| with st.spinner("βοΈ Chunking & embedding document..."): | |
| chunks = chunk_text(pages) | |
| tokenize_chunks(chunks) | |
| embedder, embeddings = create_embeddings(chunks) | |
| index = store_embeddings(embeddings) | |
| st.success("β Document indexed successfully") | |
| query = st.text_input("β Ask a question") | |
| if query: | |
| with st.spinner("π€ Generating answer..."): | |
| retrieved_chunks = retrieve_chunks(query, embedder, index, chunks) | |
| context = build_safe_context(retrieved_chunks) | |
| answer = generate_answer(query, context) | |
| st.markdown("### β Answer") | |
| st.write(answer) | |