Spaces:
Sleeping
Sleeping
| import streamlit as st | |
| import os | |
| import pdfplumber | |
| from io import BytesIO | |
| from PIL import Image | |
| from docx import Document | |
| import pandas as pd | |
| import numpy as np | |
| import faiss | |
| from sentence_transformers import SentenceTransformer | |
| from huggingface_hub import InferenceClient | |
| # ============== CONFIG ============== | |
| CHUNK_SIZE = 500 | |
| CHUNK_OVERLAP = 50 | |
| # ============== TEXT PROCESSING ============== | |
| def chunk_text(text: str) -> list[dict]: | |
| if not text or not text.strip(): | |
| return [] | |
| text = " ".join(text.strip().split()) | |
| chunks = [] | |
| start = 0 | |
| chunk_index = 0 | |
| while start < len(text): | |
| end = start + CHUNK_SIZE | |
| chunk_content = text[start:end] | |
| if end < len(text): | |
| last_period = chunk_content.rfind(". ") | |
| if last_period > CHUNK_SIZE * 0.5: | |
| chunk_content = chunk_content[:last_period + 1] | |
| end = start + last_period + 1 | |
| chunks.append({"content": chunk_content.strip(), "chunk_index": chunk_index}) | |
| chunk_index += 1 | |
| start = end - CHUNK_OVERLAP | |
| if start >= len(text) - CHUNK_OVERLAP: | |
| break | |
| return chunks | |
| # ============== DOCUMENT PARSERS ============== | |
| def parse_pdf(file_bytes) -> str: | |
| text_parts = [] | |
| with pdfplumber.open(BytesIO(file_bytes)) as pdf: | |
| for i, page in enumerate(pdf.pages): | |
| page_text = page.extract_text() or "" | |
| if page_text.strip(): | |
| text_parts.append(f"[Page {i + 1}]\n{page_text}") | |
| return "\n\n".join(text_parts) | |
| def parse_docx(file_bytes) -> str: | |
| doc = Document(BytesIO(file_bytes)) | |
| paragraphs = [para.text for para in doc.paragraphs if para.text.strip()] | |
| return "\n\n".join(paragraphs) | |
| def parse_txt(file_bytes) -> str: | |
| return file_bytes.decode("utf-8") | |
| def parse_image(file_bytes) -> str: | |
| return "[Image uploaded - OCR not available in cloud version]" | |
| def parse_csv(file_bytes) -> str: | |
| df = pd.read_csv(BytesIO(file_bytes)) | |
| lines = [f"Columns: {', '.join(df.columns.tolist())}", f"Total rows: {len(df)}", "\nData:"] | |
| for idx, row in df.head(50).iterrows(): | |
| row_text = " | ".join([f"{col}: {val}" for col, val in row.items()]) | |
| lines.append(row_text) | |
| return "\n".join(lines) | |
| def parse_document(file_bytes, filename) -> dict: | |
| ext = filename.split(".")[-1].lower() | |
| if ext == "pdf": | |
| text = parse_pdf(file_bytes) | |
| elif ext == "docx": | |
| text = parse_docx(file_bytes) | |
| elif ext == "txt": | |
| text = parse_txt(file_bytes) | |
| elif ext in ["jpg", "jpeg", "png"]: | |
| text = parse_image(file_bytes) | |
| elif ext == "csv": | |
| text = parse_csv(file_bytes) | |
| else: | |
| text = "" | |
| chunks = chunk_text(text) | |
| for chunk in chunks: | |
| chunk["source"] = filename | |
| chunk["file_type"] = ext | |
| return {"text": text, "chunks": chunks} | |
| # ============== EMBEDDING SERVICE ============== | |
| def load_embedding_model(): | |
| return SentenceTransformer("all-MiniLM-L6-v2") | |
| def embed_texts(texts: list[str]) -> np.ndarray: | |
| model = load_embedding_model() | |
| return model.encode(texts) | |
| # ============== VECTOR STORE ============== | |
| class SimpleVectorStore: | |
| def __init__(self): | |
| self.index = None | |
| self.documents = [] | |
| self.dimension = 384 | |
| def add_documents(self, chunks: list[dict]): | |
| if not chunks: | |
| return 0 | |
| texts = [c["content"] for c in chunks] | |
| embeddings = embed_texts(texts).astype("float32") | |
| if self.index is None: | |
| self.index = faiss.IndexFlatL2(self.dimension) | |
| self.index.add(embeddings) | |
| self.documents.extend(chunks) | |
| return len(chunks) | |
| def search(self, query: str, top_k: int = 5) -> list[dict]: | |
| if self.index is None or self.index.ntotal == 0: | |
| return [] | |
| query_embedding = embed_texts([query]).astype("float32") | |
| distances, indices = self.index.search(query_embedding, top_k) | |
| results = [] | |
| for i, idx in enumerate(indices[0]): | |
| if 0 <= idx < len(self.documents): | |
| doc = self.documents[idx].copy() | |
| doc["score"] = float(distances[0][i]) | |
| results.append(doc) | |
| return results | |
| def clear(self): | |
| self.index = None | |
| self.documents = [] | |
| # ============== LLM SERVICE ============== | |
| def get_llm_client(): | |
| token = os.getenv("HUGGINGFACE_API_KEY", "") | |
| if not token: | |
| try: | |
| token = st.secrets["HUGGINGFACE_API_KEY"] | |
| except: | |
| token = "" | |
| return InferenceClient(model="HuggingFaceH4/zephyr-7b-beta", token=token) | |
| def generate_answer(question: str, context: str) -> str: | |
| prompt = f"""You are a helpful assistant. Answer based on the context below. | |
| CONTEXT: | |
| {context} | |
| QUESTION: {question} | |
| ANSWER:""" | |
| try: | |
| client = get_llm_client() | |
| response = client.chat_completion( | |
| messages=[{"role": "user", "content": prompt}], | |
| max_tokens=512, | |
| temperature=0.7 | |
| ) | |
| return response.choices[0].message.content | |
| except Exception as e: | |
| return f"Error: {str(e)}" | |
| # ============== STREAMLIT APP ============== | |
| st.set_page_config(page_title="Smart RAG API", page_icon="π", layout="wide") | |
| st.title("π Smart RAG API") | |
| st.markdown("Upload documents and ask questions - Powered by HuggingFace") | |
| if "vector_store" not in st.session_state: | |
| st.session_state.vector_store = SimpleVectorStore() | |
| # Sidebar | |
| with st.sidebar: | |
| st.header("π Status") | |
| st.success("β Running") | |
| st.metric("Documents", len(st.session_state.vector_store.documents)) | |
| if st.button("ποΈ Clear All"): | |
| st.session_state.vector_store.clear() | |
| st.rerun() | |
| st.divider() | |
| st.markdown("**Supported:** PDF, DOCX, TXT, CSV") | |
| # Main columns | |
| col1, col2 = st.columns(2) | |
| with col1: | |
| st.header("π Upload") | |
| uploaded_file = st.file_uploader("Choose file", type=["pdf", "docx", "txt", "csv"]) | |
| if uploaded_file and st.button("π€ Process", type="primary"): | |
| with st.spinner("Processing..."): | |
| try: | |
| parsed = parse_document(uploaded_file.getvalue(), uploaded_file.name) | |
| added = st.session_state.vector_store.add_documents(parsed["chunks"]) | |
| st.success(f"β Added {added} chunks") | |
| except Exception as e: | |
| st.error(f"Error: {e}") | |
| with col2: | |
| st.header("π¬ Ask") | |
| question = st.text_area("Question:", placeholder="What is this about?") | |
| top_k = st.slider("Sources", 1, 5, 3) | |
| if st.button("π Answer", type="primary"): | |
| if not question: | |
| st.warning("Enter a question") | |
| elif not st.session_state.vector_store.documents: | |
| st.warning("Upload documents first") | |
| else: | |
| with st.spinner("Thinking..."): | |
| results = st.session_state.vector_store.search(question, top_k) | |
| if results: | |
| context = "\n\n".join([f"[{r['source']}]: {r['content']}" for r in results]) | |
| answer = generate_answer(question, context) | |
| st.subheader("π Answer") | |
| st.write(answer) | |
| st.subheader("π Sources") | |
| for r in results: | |
| with st.expander(r["source"]): | |
| st.write(r["content"][:300]) | |
| st.divider() | |
| st.caption("Smart RAG API - FAISS + HuggingFace") |