Spaces:
Sleeping
Sleeping
| import os | |
| import time | |
| import re | |
| import gradio as gr | |
| from pypdf import PdfReader | |
| import faiss | |
| import numpy as np | |
| from sentence_transformers import SentenceTransformer | |
| from transformers import pipeline | |
| import fitz # PyMuPDF | |
| from PIL import Image | |
| import easyocr | |
| import cv2 | |
| APP_TITLE = "RobotInsight - RAG Bot (EasyOCR + Preview)" | |
| # Models | |
| EMBEDDING_MODEL_NAME = "sentence-transformers/all-MiniLM-L6-v2" | |
| GEN_MODEL_NAME = "google/flan-t5-base" # needs sentencepiece | |
| # RAG | |
| CHUNK_SIZE = 450 | |
| CHUNK_OVERLAP = 80 | |
| TOP_K = 6 | |
| MAX_CONTEXT_CHARS = 6500 | |
| # OCR | |
| # EasyOCR supports multiple languages; keep ["en"] for speed | |
| OCR_LANGS = ["en"] | |
| # Globals | |
| embedder = None | |
| generator = None | |
| ocr_reader = None | |
| faiss_index = None | |
| doc_chunks = [] | |
| is_index_ready = False | |
| last_ingested_text = "" | |
| last_debug_chunks = "" | |
| # ----------------------------- | |
| # Init models | |
| # ----------------------------- | |
| def _init_models(): | |
| global embedder, generator | |
| if embedder is None: | |
| embedder = SentenceTransformer(EMBEDDING_MODEL_NAME) | |
| if generator is None: | |
| generator = pipeline("text2text-generation", model=GEN_MODEL_NAME, device=-1) | |
| def _init_ocr(): | |
| global ocr_reader | |
| if ocr_reader is None: | |
| # gpu=False for HF CPU Spaces | |
| ocr_reader = easyocr.Reader(OCR_LANGS, gpu=False) | |
| # ----------------------------- | |
| # Text utils | |
| # ----------------------------- | |
| def clean_text(text: str) -> str: | |
| if not text: | |
| return "" | |
| text = text.replace("\x00", " ") | |
| text = re.sub(r"[ \t]+", " ", text) | |
| text = re.sub(r"\n{3,}", "\n\n", text) | |
| return text.strip() | |
| def extract_text_from_pdf_text(pdf_path: str) -> str: | |
| """Try text extraction first (fast).""" | |
| try: | |
| reader = PdfReader(pdf_path) | |
| pages = [] | |
| for page in reader.pages: | |
| pages.append(page.extract_text() or "") | |
| return clean_text("\n".join(pages)) | |
| except Exception: | |
| return "" | |
| def extract_text_from_txt(path: str) -> str: | |
| with open(path, "r", encoding="utf-8", errors="ignore") as f: | |
| return clean_text(f.read()) | |
| # ----------------------------- | |
| # EasyOCR for scanned PDFs | |
| # ----------------------------- | |
| def ocr_pdf_easyocr(pdf_path: str, max_pages: int = 10, dpi: int = 250) -> str: | |
| """ | |
| Render each page to an image and run EasyOCR. | |
| Works well for full-page scanned documents (unlike TrOCR). | |
| """ | |
| _init_ocr() | |
| doc = fitz.open(pdf_path) | |
| pages_to_process = min(len(doc), int(max_pages)) | |
| extracted_pages = [] | |
| zoom = dpi / 72.0 | |
| mat = fitz.Matrix(zoom, zoom) | |
| for i in range(pages_to_process): | |
| page = doc.load_page(i) | |
| pix = page.get_pixmap(matrix=mat, alpha=False) | |
| img = np.frombuffer(pix.samples, dtype=np.uint8).reshape(pix.height, pix.width, 3) | |
| # Preprocess improves OCR quality | |
| gray = cv2.cvtColor(img, cv2.COLOR_RGB2GRAY) | |
| gray = cv2.bilateralFilter(gray, 9, 75, 75) | |
| _, th = cv2.threshold(gray, 0, 255, cv2.THRESH_BINARY + cv2.THRESH_OTSU) | |
| # EasyOCR expects RGB or grayscale; we pass thresholded grayscale | |
| results = ocr_reader.readtext(th, detail=0, paragraph=True) | |
| page_text = "\n".join([r.strip() for r in results if r and r.strip()]) | |
| page_text = clean_text(page_text) | |
| if page_text: | |
| extracted_pages.append(page_text) | |
| doc.close() | |
| return clean_text("\n\n".join(extracted_pages)) | |
| # ----------------------------- | |
| # Chunking | |
| # ----------------------------- | |
| def chunk_text(text: str, chunk_size=CHUNK_SIZE, overlap=CHUNK_OVERLAP): | |
| text = clean_text(text) | |
| if not text: | |
| return [] | |
| chunks = [] | |
| start = 0 | |
| n = len(text) | |
| while start < n: | |
| end = min(start + chunk_size, n) | |
| chunk = text[start:end].strip() | |
| if chunk: | |
| chunks.append(chunk) | |
| if end == n: | |
| break | |
| start = max(0, end - overlap) | |
| return chunks | |
| # ----------------------------- | |
| # FAISS index (cosine similarity) | |
| # ----------------------------- | |
| def build_faiss_index(text_chunks): | |
| global faiss_index, doc_chunks, is_index_ready | |
| _init_models() | |
| doc_chunks = list(text_chunks) | |
| vectors = embedder.encode(doc_chunks, convert_to_numpy=True, show_progress_bar=False).astype("float32") | |
| faiss.normalize_L2(vectors) | |
| dim = vectors.shape[1] | |
| index = faiss.IndexFlatIP(dim) | |
| index.add(vectors) | |
| faiss_index = index | |
| is_index_ready = True | |
| def retrieve(query: str, k=TOP_K): | |
| if not is_index_ready or faiss_index is None or not doc_chunks: | |
| return [] | |
| q_vec = embedder.encode([query], convert_to_numpy=True, show_progress_bar=False).astype("float32") | |
| faiss.normalize_L2(q_vec) | |
| scores, ids = faiss_index.search(q_vec, k) | |
| ids = ids[0].tolist() | |
| scores = scores[0].tolist() | |
| results = [] | |
| for cid, sc in zip(ids, scores): | |
| if cid == -1: | |
| continue | |
| results.append({"chunk_id": cid, "score": float(sc), "chunk": doc_chunks[cid]}) | |
| return results | |
| # ----------------------------- | |
| # Generate answer | |
| # ----------------------------- | |
| def generate_answer(query: str, retrieved_chunks): | |
| if not retrieved_chunks: | |
| return "Not found in this document." | |
| # Build clean context (no chunk labels) | |
| context = "\n".join([r["chunk"] for r in retrieved_chunks]) | |
| context = context[:MAX_CONTEXT_CHARS] | |
| prompt = f"""Answer the question clearly and concisely using the document text. | |
| Do NOT repeat raw OCR text or headers. | |
| Return ONLY the final answer sentence. | |
| DOCUMENT: | |
| {context} | |
| QUESTION: | |
| {query} | |
| ANSWER: | |
| """ | |
| out = generator(prompt, max_length=128, do_sample=False)[0]["generated_text"] | |
| return out.strip() | |
| # ----------------------------- | |
| # Ingest | |
| # ----------------------------- | |
| def ingest_files(files, force_ocr, ocr_max_pages, ocr_dpi): | |
| global is_index_ready, faiss_index, doc_chunks, last_ingested_text, last_debug_chunks | |
| start_time = time.time() | |
| is_index_ready = False | |
| faiss_index = None | |
| doc_chunks = [] | |
| last_ingested_text = "" | |
| last_debug_chunks = "" | |
| if not files: | |
| return "β Please upload at least one PDF/TXT/MD file." | |
| status = [f"π RobotInsight Ingest Started | Files: {len(files)}"] | |
| combined_text = [] | |
| for f in files: | |
| path = f.name | |
| ext = os.path.splitext(path)[1].lower() | |
| status.append(f"β‘οΈ Reading: {os.path.basename(path)}") | |
| try: | |
| text = "" | |
| if ext == ".pdf": | |
| # 1) Try normal text extraction | |
| text = extract_text_from_pdf_text(path) | |
| # 2) If itβs empty OR forced OCR, run EasyOCR | |
| if force_ocr or not text.strip(): | |
| status.append(f"π EasyOCR running (max_pages={int(ocr_max_pages)}, dpi={int(ocr_dpi)})...") | |
| text = ocr_pdf_easyocr(path, max_pages=int(ocr_max_pages), dpi=int(ocr_dpi)) | |
| elif ext in [".txt", ".md"]: | |
| text = extract_text_from_txt(path) | |
| else: | |
| status.append(f"β οΈ Unsupported file type: {ext}") | |
| continue | |
| text = clean_text(text) | |
| if text: | |
| combined_text.append(text) | |
| else: | |
| status.append("β οΈ No usable text extracted from this file.") | |
| except Exception as e: | |
| status.append(f"β Error: {type(e).__name__}: {e}") | |
| full_text = clean_text("\n\n".join(combined_text)) | |
| last_ingested_text = full_text | |
| if not full_text: | |
| return "\n".join(status + ["β No readable text found. Increase OCR pages/DPI or verify PDF."]) | |
| status.append("βοΈ Chunking text...") | |
| chunks = chunk_text(full_text) | |
| status.append(f"β Chunks created: {len(chunks)}") | |
| status.append("π§ Building embeddings + FAISS index...") | |
| build_faiss_index(chunks) | |
| elapsed = time.time() - start_time | |
| status.append(f"π Ingest Complete in {elapsed:.2f} seconds") | |
| status.append("β Ready. Click Preview Extracted Text to verify it contains EPA text.") | |
| return "\n".join(status) | |
| # ----------------------------- | |
| # Preview / Debug | |
| # ----------------------------- | |
| def preview_ingested_text(): | |
| if not last_ingested_text.strip(): | |
| return "β No text ingested yet. Upload and click Ingest." | |
| return last_ingested_text[:4000] | |
| def debug_chunks_used(): | |
| if not last_debug_chunks.strip(): | |
| return "β Ask a question first, then click Debug." | |
| return last_debug_chunks | |
| # ----------------------------- | |
| # Chat | |
| # ----------------------------- | |
| def respond(user_message, history): | |
| global last_debug_chunks | |
| if history is None: | |
| history = [] | |
| user_message = (user_message or "").strip() | |
| if not user_message: | |
| return history, "" | |
| history.append({"role": "user", "content": user_message}) | |
| try: | |
| _init_models() | |
| if not is_index_ready: | |
| history.append({"role": "assistant", "content": "Please upload a document and click **Ingest** first."}) | |
| return history, "" | |
| retrieved = retrieve(user_message, TOP_K) | |
| last_debug_chunks = "\n\n".join( | |
| [f"[Chunk {r['chunk_id']} score={r['score']:.2f}]\n{r['chunk']}" for r in retrieved] | |
| ) | |
| answer = generate_answer(user_message, retrieved) | |
| history.append({"role": "assistant", "content": answer}) | |
| return history, "" | |
| except Exception as e: | |
| history.append({"role": "assistant", "content": f"β Error: {type(e).__name__}: {e}"}) | |
| return history, "" | |
| def reset_index(): | |
| global faiss_index, doc_chunks, is_index_ready, last_ingested_text, last_debug_chunks | |
| faiss_index = None | |
| doc_chunks = [] | |
| is_index_ready = False | |
| last_ingested_text = "" | |
| last_debug_chunks = "" | |
| return "π§Ή Reset done." | |
| # ----------------------------- | |
| # UI | |
| # ----------------------------- | |
| with gr.Blocks(title=APP_TITLE) as demo: | |
| gr.Markdown( | |
| f"# π€ {APP_TITLE}\n" | |
| "Upload β Ingest β Preview extracted text β Ask questions.\n\n" | |
| "**If your PDF is scanned/image-based (like the EPA sample letter), turn ON Force OCR.**" | |
| ) | |
| file_upload = gr.File(file_count="multiple", file_types=[".pdf", ".txt", ".md"], label="Upload PDF / TXT / MD") | |
| with gr.Row(): | |
| force_ocr = gr.Checkbox(value=True, label="Force OCR (recommended for scanned PDFs)") | |
| ocr_max_pages = gr.Slider(1, 30, value=10, step=1, label="OCR Max Pages") | |
| ocr_dpi = gr.Slider(150, 350, value=250, step=10, label="OCR DPI") | |
| with gr.Row(): | |
| ingest_btn = gr.Button("β Ingest", variant="primary") | |
| reset_btn = gr.Button("π§Ή Reset") | |
| ingest_status = gr.Textbox(label="Ingest Status", lines=10) | |
| with gr.Row(): | |
| preview_btn = gr.Button("π Preview Extracted Text") | |
| debug_btn = gr.Button("π§ͺ Debug: Show Retrieved Chunks") | |
| preview_box = gr.Textbox(label="Extracted Text Preview (first 4000 chars)", lines=12) | |
| debug_box = gr.Textbox(label="Retrieved Chunks for last question", lines=12) | |
| gr.Markdown("## π¬ Chat") | |
| chatbot = gr.Chatbot(label="RobotInsight Chat", height=360) | |
| user_input = gr.Textbox(label="Ask a question", placeholder="Example: Who signed the letter?") | |
| send_btn = gr.Button("Send") | |
| ingest_btn.click(fn=ingest_files, inputs=[file_upload, force_ocr, ocr_max_pages, ocr_dpi], outputs=[ingest_status]) | |
| reset_btn.click(fn=reset_index, inputs=[], outputs=[ingest_status]) | |
| preview_btn.click(fn=preview_ingested_text, inputs=[], outputs=[preview_box]) | |
| debug_btn.click(fn=debug_chunks_used, inputs=[], outputs=[debug_box]) | |
| send_btn.click(fn=respond, inputs=[user_input, chatbot], outputs=[chatbot, user_input]) | |
| user_input.submit(fn=respond, inputs=[user_input, chatbot], outputs=[chatbot, user_input]) | |
| demo.launch() | |