import os import time import re import gradio as gr from pypdf import PdfReader import faiss import numpy as np from sentence_transformers import SentenceTransformer from transformers import pipeline import fitz # PyMuPDF from PIL import Image import easyocr import cv2 APP_TITLE = "RobotInsight - RAG Bot (EasyOCR + Preview)" # Models EMBEDDING_MODEL_NAME = "sentence-transformers/all-MiniLM-L6-v2" GEN_MODEL_NAME = "google/flan-t5-base" # needs sentencepiece # RAG CHUNK_SIZE = 450 CHUNK_OVERLAP = 80 TOP_K = 6 MAX_CONTEXT_CHARS = 6500 # OCR # EasyOCR supports multiple languages; keep ["en"] for speed OCR_LANGS = ["en"] # Globals embedder = None generator = None ocr_reader = None faiss_index = None doc_chunks = [] is_index_ready = False last_ingested_text = "" last_debug_chunks = "" # ----------------------------- # Init models # ----------------------------- def _init_models(): global embedder, generator if embedder is None: embedder = SentenceTransformer(EMBEDDING_MODEL_NAME) if generator is None: generator = pipeline("text2text-generation", model=GEN_MODEL_NAME, device=-1) def _init_ocr(): global ocr_reader if ocr_reader is None: # gpu=False for HF CPU Spaces ocr_reader = easyocr.Reader(OCR_LANGS, gpu=False) # ----------------------------- # Text utils # ----------------------------- def clean_text(text: str) -> str: if not text: return "" text = text.replace("\x00", " ") text = re.sub(r"[ \t]+", " ", text) text = re.sub(r"\n{3,}", "\n\n", text) return text.strip() def extract_text_from_pdf_text(pdf_path: str) -> str: """Try text extraction first (fast).""" try: reader = PdfReader(pdf_path) pages = [] for page in reader.pages: pages.append(page.extract_text() or "") return clean_text("\n".join(pages)) except Exception: return "" def extract_text_from_txt(path: str) -> str: with open(path, "r", encoding="utf-8", errors="ignore") as f: return clean_text(f.read()) # ----------------------------- # EasyOCR for scanned PDFs # ----------------------------- def ocr_pdf_easyocr(pdf_path: str, max_pages: int = 10, dpi: int = 250) -> str: """ Render each page to an image and run EasyOCR. Works well for full-page scanned documents (unlike TrOCR). """ _init_ocr() doc = fitz.open(pdf_path) pages_to_process = min(len(doc), int(max_pages)) extracted_pages = [] zoom = dpi / 72.0 mat = fitz.Matrix(zoom, zoom) for i in range(pages_to_process): page = doc.load_page(i) pix = page.get_pixmap(matrix=mat, alpha=False) img = np.frombuffer(pix.samples, dtype=np.uint8).reshape(pix.height, pix.width, 3) # Preprocess improves OCR quality gray = cv2.cvtColor(img, cv2.COLOR_RGB2GRAY) gray = cv2.bilateralFilter(gray, 9, 75, 75) _, th = cv2.threshold(gray, 0, 255, cv2.THRESH_BINARY + cv2.THRESH_OTSU) # EasyOCR expects RGB or grayscale; we pass thresholded grayscale results = ocr_reader.readtext(th, detail=0, paragraph=True) page_text = "\n".join([r.strip() for r in results if r and r.strip()]) page_text = clean_text(page_text) if page_text: extracted_pages.append(page_text) doc.close() return clean_text("\n\n".join(extracted_pages)) # ----------------------------- # Chunking # ----------------------------- def chunk_text(text: str, chunk_size=CHUNK_SIZE, overlap=CHUNK_OVERLAP): text = clean_text(text) if not text: return [] chunks = [] start = 0 n = len(text) while start < n: end = min(start + chunk_size, n) chunk = text[start:end].strip() if chunk: chunks.append(chunk) if end == n: break start = max(0, end - overlap) return chunks # ----------------------------- # FAISS index (cosine similarity) # ----------------------------- def build_faiss_index(text_chunks): global faiss_index, doc_chunks, is_index_ready _init_models() doc_chunks = list(text_chunks) vectors = embedder.encode(doc_chunks, convert_to_numpy=True, show_progress_bar=False).astype("float32") faiss.normalize_L2(vectors) dim = vectors.shape[1] index = faiss.IndexFlatIP(dim) index.add(vectors) faiss_index = index is_index_ready = True def retrieve(query: str, k=TOP_K): if not is_index_ready or faiss_index is None or not doc_chunks: return [] q_vec = embedder.encode([query], convert_to_numpy=True, show_progress_bar=False).astype("float32") faiss.normalize_L2(q_vec) scores, ids = faiss_index.search(q_vec, k) ids = ids[0].tolist() scores = scores[0].tolist() results = [] for cid, sc in zip(ids, scores): if cid == -1: continue results.append({"chunk_id": cid, "score": float(sc), "chunk": doc_chunks[cid]}) return results # ----------------------------- # Generate answer # ----------------------------- def generate_answer(query: str, retrieved_chunks): if not retrieved_chunks: return "Not found in this document." # Build clean context (no chunk labels) context = "\n".join([r["chunk"] for r in retrieved_chunks]) context = context[:MAX_CONTEXT_CHARS] prompt = f"""Answer the question clearly and concisely using the document text. Do NOT repeat raw OCR text or headers. Return ONLY the final answer sentence. DOCUMENT: {context} QUESTION: {query} ANSWER: """ out = generator(prompt, max_length=128, do_sample=False)[0]["generated_text"] return out.strip() # ----------------------------- # Ingest # ----------------------------- def ingest_files(files, force_ocr, ocr_max_pages, ocr_dpi): global is_index_ready, faiss_index, doc_chunks, last_ingested_text, last_debug_chunks start_time = time.time() is_index_ready = False faiss_index = None doc_chunks = [] last_ingested_text = "" last_debug_chunks = "" if not files: return "❌ Please upload at least one PDF/TXT/MD file." status = [f"πŸ“Œ RobotInsight Ingest Started | Files: {len(files)}"] combined_text = [] for f in files: path = f.name ext = os.path.splitext(path)[1].lower() status.append(f"➑️ Reading: {os.path.basename(path)}") try: text = "" if ext == ".pdf": # 1) Try normal text extraction text = extract_text_from_pdf_text(path) # 2) If it’s empty OR forced OCR, run EasyOCR if force_ocr or not text.strip(): status.append(f"πŸ”Ž EasyOCR running (max_pages={int(ocr_max_pages)}, dpi={int(ocr_dpi)})...") text = ocr_pdf_easyocr(path, max_pages=int(ocr_max_pages), dpi=int(ocr_dpi)) elif ext in [".txt", ".md"]: text = extract_text_from_txt(path) else: status.append(f"⚠️ Unsupported file type: {ext}") continue text = clean_text(text) if text: combined_text.append(text) else: status.append("⚠️ No usable text extracted from this file.") except Exception as e: status.append(f"❌ Error: {type(e).__name__}: {e}") full_text = clean_text("\n\n".join(combined_text)) last_ingested_text = full_text if not full_text: return "\n".join(status + ["❌ No readable text found. Increase OCR pages/DPI or verify PDF."]) status.append("βœ‚οΈ Chunking text...") chunks = chunk_text(full_text) status.append(f"βœ… Chunks created: {len(chunks)}") status.append("🧠 Building embeddings + FAISS index...") build_faiss_index(chunks) elapsed = time.time() - start_time status.append(f"πŸŽ‰ Ingest Complete in {elapsed:.2f} seconds") status.append("βœ… Ready. Click Preview Extracted Text to verify it contains EPA text.") return "\n".join(status) # ----------------------------- # Preview / Debug # ----------------------------- def preview_ingested_text(): if not last_ingested_text.strip(): return "❌ No text ingested yet. Upload and click Ingest." return last_ingested_text[:4000] def debug_chunks_used(): if not last_debug_chunks.strip(): return "❌ Ask a question first, then click Debug." return last_debug_chunks # ----------------------------- # Chat # ----------------------------- def respond(user_message, history): global last_debug_chunks if history is None: history = [] user_message = (user_message or "").strip() if not user_message: return history, "" history.append({"role": "user", "content": user_message}) try: _init_models() if not is_index_ready: history.append({"role": "assistant", "content": "Please upload a document and click **Ingest** first."}) return history, "" retrieved = retrieve(user_message, TOP_K) last_debug_chunks = "\n\n".join( [f"[Chunk {r['chunk_id']} score={r['score']:.2f}]\n{r['chunk']}" for r in retrieved] ) answer = generate_answer(user_message, retrieved) history.append({"role": "assistant", "content": answer}) return history, "" except Exception as e: history.append({"role": "assistant", "content": f"❌ Error: {type(e).__name__}: {e}"}) return history, "" def reset_index(): global faiss_index, doc_chunks, is_index_ready, last_ingested_text, last_debug_chunks faiss_index = None doc_chunks = [] is_index_ready = False last_ingested_text = "" last_debug_chunks = "" return "🧹 Reset done." # ----------------------------- # UI # ----------------------------- with gr.Blocks(title=APP_TITLE) as demo: gr.Markdown( f"# πŸ€– {APP_TITLE}\n" "Upload β†’ Ingest β†’ Preview extracted text β†’ Ask questions.\n\n" "**If your PDF is scanned/image-based (like the EPA sample letter), turn ON Force OCR.**" ) file_upload = gr.File(file_count="multiple", file_types=[".pdf", ".txt", ".md"], label="Upload PDF / TXT / MD") with gr.Row(): force_ocr = gr.Checkbox(value=True, label="Force OCR (recommended for scanned PDFs)") ocr_max_pages = gr.Slider(1, 30, value=10, step=1, label="OCR Max Pages") ocr_dpi = gr.Slider(150, 350, value=250, step=10, label="OCR DPI") with gr.Row(): ingest_btn = gr.Button("βœ… Ingest", variant="primary") reset_btn = gr.Button("🧹 Reset") ingest_status = gr.Textbox(label="Ingest Status", lines=10) with gr.Row(): preview_btn = gr.Button("πŸ‘€ Preview Extracted Text") debug_btn = gr.Button("πŸ§ͺ Debug: Show Retrieved Chunks") preview_box = gr.Textbox(label="Extracted Text Preview (first 4000 chars)", lines=12) debug_box = gr.Textbox(label="Retrieved Chunks for last question", lines=12) gr.Markdown("## πŸ’¬ Chat") chatbot = gr.Chatbot(label="RobotInsight Chat", height=360) user_input = gr.Textbox(label="Ask a question", placeholder="Example: Who signed the letter?") send_btn = gr.Button("Send") ingest_btn.click(fn=ingest_files, inputs=[file_upload, force_ocr, ocr_max_pages, ocr_dpi], outputs=[ingest_status]) reset_btn.click(fn=reset_index, inputs=[], outputs=[ingest_status]) preview_btn.click(fn=preview_ingested_text, inputs=[], outputs=[preview_box]) debug_btn.click(fn=debug_chunks_used, inputs=[], outputs=[debug_box]) send_btn.click(fn=respond, inputs=[user_input, chatbot], outputs=[chatbot, user_input]) user_input.submit(fn=respond, inputs=[user_input, chatbot], outputs=[chatbot, user_input]) demo.launch()