Spaces:
Build error
Build error
| import os | |
| import re | |
| import gradio as gr | |
| import faiss | |
| import numpy as np | |
| from pypdf import PdfReader | |
| from sentence_transformers import SentenceTransformer | |
| from huggingface_hub import InferenceClient | |
| # ----------------------------- | |
| # Config | |
| # ----------------------------- | |
| HF_TOKEN = os.getenv("HUGGINGFACEHUB_API_TOKEN") or os.getenv("HF_TOKEN") | |
| # LLM (keep same default, but we will call it via chat_completion, not text_generation) | |
| HF_LLM_MODEL = os.getenv("HF_LLM_MODEL", "mistralai/Mistral-7B-Instruct-v0.3") | |
| # IMPORTANT: | |
| # If you are explicitly using Together as a provider, set this variable in Space secrets: | |
| # HF_PROVIDER="together" | |
| # If you leave it empty, it will use Hugging Face default provider. | |
| HF_PROVIDER = os.getenv("HF_PROVIDER", "").strip() or None | |
| EMBED_MODEL_NAME = os.getenv("EMBED_MODEL", "sentence-transformers/all-MiniLM-L6-v2") | |
| TOP_K = 4 | |
| # ----------------------------- | |
| # Helpers | |
| # ----------------------------- | |
| def clean_text(s: str) -> str: | |
| s = re.sub(r"\s+", " ", s) | |
| return s.strip() | |
| def chunk_text(text: str, chunk_size=900, overlap=150): | |
| chunks = [] | |
| start = 0 | |
| n = len(text) | |
| while start < n: | |
| end = min(n, start + chunk_size) | |
| chunks.append(text[start:end]) | |
| start = end - overlap | |
| if start < 0: | |
| start = 0 | |
| if end == n: | |
| break | |
| return [c for c in (clean_text(x) for x in chunks) if len(c) > 30] | |
| def pdf_to_text(pdf_path: str) -> str: | |
| reader = PdfReader(pdf_path) | |
| pages = [] | |
| for p in reader.pages: | |
| t = p.extract_text() or "" | |
| if t.strip(): | |
| pages.append(t) | |
| return "\n".join(pages) | |
| def build_faiss_index(chunks, embedder): | |
| vectors = embedder.encode(chunks, convert_to_numpy=True, normalize_embeddings=True) | |
| dim = vectors.shape[1] | |
| index = faiss.IndexFlatIP(dim) # cosine similarity since normalized | |
| index.add(vectors.astype(np.float32)) | |
| return index, vectors | |
| def retrieve(query, embedder, index, chunks, k=TOP_K): | |
| qv = embedder.encode([query], convert_to_numpy=True, normalize_embeddings=True).astype(np.float32) | |
| scores, ids = index.search(qv, k) | |
| hits = [] | |
| for score, idx in zip(scores[0], ids[0]): | |
| if idx == -1: | |
| continue | |
| hits.append((float(score), chunks[int(idx)])) | |
| return hits | |
| def hf_generate(client: InferenceClient, prompt: str) -> str: | |
| """ | |
| FIX: | |
| Together provider doesn't support `text_generation` for this model. | |
| Use chat_completion (conversational) instead. | |
| """ | |
| resp = client.chat_completion( | |
| model=HF_LLM_MODEL, | |
| messages=[ | |
| {"role": "system", "content": "You are a helpful assistant. Answer using ONLY the provided context."}, | |
| {"role": "user", "content": prompt}, | |
| ], | |
| max_tokens=450, | |
| temperature=0.2, | |
| top_p=0.9, | |
| ) | |
| return resp.choices[0].message.content.strip() | |
| # ----------------------------- | |
| # App logic (cached state) | |
| # ----------------------------- | |
| embedder = SentenceTransformer(EMBED_MODEL_NAME) | |
| def on_upload(pdf_path): | |
| if not pdf_path: | |
| return None, None, "Please upload a PDF." | |
| text = pdf_to_text(pdf_path) | |
| if not text.strip(): | |
| return None, None, "Could not extract text from this PDF (it may be scanned). Try a text-based PDF." | |
| chunks = chunk_text(text) | |
| if len(chunks) < 2: | |
| return None, None, "Not enough extractable text to build RAG index." | |
| index, _ = build_faiss_index(chunks, embedder) | |
| return index, chunks, f"β Indexed {len(chunks)} chunks. Now ask a question." | |
| def answer_question(index, chunks, question): | |
| # FIX: gate on index/chunks, NOT on the original pdf file | |
| if index is None or chunks is None: | |
| return "Upload and index a PDF first." | |
| if not question or not question.strip(): | |
| return "Type a question." | |
| if not HF_TOKEN: | |
| return ( | |
| "HF token not found. Go to Space β Settings β Variables and secrets β " | |
| "add Secret named HUGGINGFACEHUB_API_TOKEN, then Restart Space." | |
| ) | |
| hits = retrieve(question, embedder, index, chunks, k=TOP_K) | |
| context = "\n\n".join([f"[{i+1}] {h[1]}" for i, h in enumerate(hits)]) | |
| prompt = f"""Answer using ONLY the context. | |
| If the answer is not in the context, say: "I don't know from the provided document." | |
| Question: {question} | |
| Context: | |
| {context} | |
| Answer:""" | |
| # If HF_PROVIDER is set to "together", this will route to Together. | |
| # If not set, it uses Hugging Face default provider. | |
| if HF_PROVIDER: | |
| client = InferenceClient(provider=HF_PROVIDER, token=HF_TOKEN) | |
| else: | |
| client = InferenceClient(token=HF_TOKEN) | |
| ans = hf_generate(client, prompt) | |
| sources = "\n\n".join( | |
| [f"**Source {i+1} (score={hits[i][0]:.3f})**\n{hits[i][1][:600]}..." for i in range(len(hits))] | |
| ) | |
| return f"### Answer\n{ans}\n\n---\n### Retrieved Sources\n{sources}" | |
| # ----------------------------- | |
| # UI | |
| # ----------------------------- | |
| with gr.Blocks(title="Agentic Document Intelligence (HF RAG)") as demo: | |
| gr.Markdown( | |
| "# π Agentic Document Intelligence\n" | |
| "Upload a PDF and ask questions (RAG) β using Hugging Face Inference API.\n\n" | |
| "**Tip:** If you use Together as a provider, set Space secret `HF_PROVIDER=together`." | |
| ) | |
| pdf = gr.File(label="Upload PDF", type="filepath") | |
| status = gr.Markdown() | |
| index_state = gr.State(None) | |
| chunks_state = gr.State(None) | |
| pdf.change( | |
| fn=on_upload, | |
| inputs=[pdf], | |
| outputs=[index_state, chunks_state, status], | |
| ) | |
| question = gr.Textbox(label="Ask a question", placeholder="e.g., What is the payment term?") | |
| out = gr.Markdown() | |
| btn = gr.Button("Run") | |
| btn.click( | |
| fn=answer_question, | |
| inputs=[index_state, chunks_state, question], | |
| outputs=[out], | |
| ) | |
| demo.launch() | |