ganesh435's picture
Update app.py
c9a5392 verified
import os
import time
import re
import gradio as gr
from pypdf import PdfReader
import faiss
import numpy as np
from sentence_transformers import SentenceTransformer
from transformers import pipeline
import fitz # PyMuPDF
from PIL import Image
import easyocr
import cv2
APP_TITLE = "RobotInsight - RAG Bot (EasyOCR + Preview)"
# Models
EMBEDDING_MODEL_NAME = "sentence-transformers/all-MiniLM-L6-v2"
GEN_MODEL_NAME = "google/flan-t5-base" # needs sentencepiece
# RAG
CHUNK_SIZE = 450
CHUNK_OVERLAP = 80
TOP_K = 6
MAX_CONTEXT_CHARS = 6500
# OCR
# EasyOCR supports multiple languages; keep ["en"] for speed
OCR_LANGS = ["en"]
# Globals
embedder = None
generator = None
ocr_reader = None
faiss_index = None
doc_chunks = []
is_index_ready = False
last_ingested_text = ""
last_debug_chunks = ""
# -----------------------------
# Init models
# -----------------------------
def _init_models():
global embedder, generator
if embedder is None:
embedder = SentenceTransformer(EMBEDDING_MODEL_NAME)
if generator is None:
generator = pipeline("text2text-generation", model=GEN_MODEL_NAME, device=-1)
def _init_ocr():
global ocr_reader
if ocr_reader is None:
# gpu=False for HF CPU Spaces
ocr_reader = easyocr.Reader(OCR_LANGS, gpu=False)
# -----------------------------
# Text utils
# -----------------------------
def clean_text(text: str) -> str:
if not text:
return ""
text = text.replace("\x00", " ")
text = re.sub(r"[ \t]+", " ", text)
text = re.sub(r"\n{3,}", "\n\n", text)
return text.strip()
def extract_text_from_pdf_text(pdf_path: str) -> str:
"""Try text extraction first (fast)."""
try:
reader = PdfReader(pdf_path)
pages = []
for page in reader.pages:
pages.append(page.extract_text() or "")
return clean_text("\n".join(pages))
except Exception:
return ""
def extract_text_from_txt(path: str) -> str:
with open(path, "r", encoding="utf-8", errors="ignore") as f:
return clean_text(f.read())
# -----------------------------
# EasyOCR for scanned PDFs
# -----------------------------
def ocr_pdf_easyocr(pdf_path: str, max_pages: int = 10, dpi: int = 250) -> str:
"""
Render each page to an image and run EasyOCR.
Works well for full-page scanned documents (unlike TrOCR).
"""
_init_ocr()
doc = fitz.open(pdf_path)
pages_to_process = min(len(doc), int(max_pages))
extracted_pages = []
zoom = dpi / 72.0
mat = fitz.Matrix(zoom, zoom)
for i in range(pages_to_process):
page = doc.load_page(i)
pix = page.get_pixmap(matrix=mat, alpha=False)
img = np.frombuffer(pix.samples, dtype=np.uint8).reshape(pix.height, pix.width, 3)
# Preprocess improves OCR quality
gray = cv2.cvtColor(img, cv2.COLOR_RGB2GRAY)
gray = cv2.bilateralFilter(gray, 9, 75, 75)
_, th = cv2.threshold(gray, 0, 255, cv2.THRESH_BINARY + cv2.THRESH_OTSU)
# EasyOCR expects RGB or grayscale; we pass thresholded grayscale
results = ocr_reader.readtext(th, detail=0, paragraph=True)
page_text = "\n".join([r.strip() for r in results if r and r.strip()])
page_text = clean_text(page_text)
if page_text:
extracted_pages.append(page_text)
doc.close()
return clean_text("\n\n".join(extracted_pages))
# -----------------------------
# Chunking
# -----------------------------
def chunk_text(text: str, chunk_size=CHUNK_SIZE, overlap=CHUNK_OVERLAP):
text = clean_text(text)
if not text:
return []
chunks = []
start = 0
n = len(text)
while start < n:
end = min(start + chunk_size, n)
chunk = text[start:end].strip()
if chunk:
chunks.append(chunk)
if end == n:
break
start = max(0, end - overlap)
return chunks
# -----------------------------
# FAISS index (cosine similarity)
# -----------------------------
def build_faiss_index(text_chunks):
global faiss_index, doc_chunks, is_index_ready
_init_models()
doc_chunks = list(text_chunks)
vectors = embedder.encode(doc_chunks, convert_to_numpy=True, show_progress_bar=False).astype("float32")
faiss.normalize_L2(vectors)
dim = vectors.shape[1]
index = faiss.IndexFlatIP(dim)
index.add(vectors)
faiss_index = index
is_index_ready = True
def retrieve(query: str, k=TOP_K):
if not is_index_ready or faiss_index is None or not doc_chunks:
return []
q_vec = embedder.encode([query], convert_to_numpy=True, show_progress_bar=False).astype("float32")
faiss.normalize_L2(q_vec)
scores, ids = faiss_index.search(q_vec, k)
ids = ids[0].tolist()
scores = scores[0].tolist()
results = []
for cid, sc in zip(ids, scores):
if cid == -1:
continue
results.append({"chunk_id": cid, "score": float(sc), "chunk": doc_chunks[cid]})
return results
# -----------------------------
# Generate answer
# -----------------------------
def generate_answer(query: str, retrieved_chunks):
if not retrieved_chunks:
return "Not found in this document."
# Build clean context (no chunk labels)
context = "\n".join([r["chunk"] for r in retrieved_chunks])
context = context[:MAX_CONTEXT_CHARS]
prompt = f"""Answer the question clearly and concisely using the document text.
Do NOT repeat raw OCR text or headers.
Return ONLY the final answer sentence.
DOCUMENT:
{context}
QUESTION:
{query}
ANSWER:
"""
out = generator(prompt, max_length=128, do_sample=False)[0]["generated_text"]
return out.strip()
# -----------------------------
# Ingest
# -----------------------------
def ingest_files(files, force_ocr, ocr_max_pages, ocr_dpi):
global is_index_ready, faiss_index, doc_chunks, last_ingested_text, last_debug_chunks
start_time = time.time()
is_index_ready = False
faiss_index = None
doc_chunks = []
last_ingested_text = ""
last_debug_chunks = ""
if not files:
return "❌ Please upload at least one PDF/TXT/MD file."
status = [f"πŸ“Œ RobotInsight Ingest Started | Files: {len(files)}"]
combined_text = []
for f in files:
path = f.name
ext = os.path.splitext(path)[1].lower()
status.append(f"➑️ Reading: {os.path.basename(path)}")
try:
text = ""
if ext == ".pdf":
# 1) Try normal text extraction
text = extract_text_from_pdf_text(path)
# 2) If it’s empty OR forced OCR, run EasyOCR
if force_ocr or not text.strip():
status.append(f"πŸ”Ž EasyOCR running (max_pages={int(ocr_max_pages)}, dpi={int(ocr_dpi)})...")
text = ocr_pdf_easyocr(path, max_pages=int(ocr_max_pages), dpi=int(ocr_dpi))
elif ext in [".txt", ".md"]:
text = extract_text_from_txt(path)
else:
status.append(f"⚠️ Unsupported file type: {ext}")
continue
text = clean_text(text)
if text:
combined_text.append(text)
else:
status.append("⚠️ No usable text extracted from this file.")
except Exception as e:
status.append(f"❌ Error: {type(e).__name__}: {e}")
full_text = clean_text("\n\n".join(combined_text))
last_ingested_text = full_text
if not full_text:
return "\n".join(status + ["❌ No readable text found. Increase OCR pages/DPI or verify PDF."])
status.append("βœ‚οΈ Chunking text...")
chunks = chunk_text(full_text)
status.append(f"βœ… Chunks created: {len(chunks)}")
status.append("🧠 Building embeddings + FAISS index...")
build_faiss_index(chunks)
elapsed = time.time() - start_time
status.append(f"πŸŽ‰ Ingest Complete in {elapsed:.2f} seconds")
status.append("βœ… Ready. Click Preview Extracted Text to verify it contains EPA text.")
return "\n".join(status)
# -----------------------------
# Preview / Debug
# -----------------------------
def preview_ingested_text():
if not last_ingested_text.strip():
return "❌ No text ingested yet. Upload and click Ingest."
return last_ingested_text[:4000]
def debug_chunks_used():
if not last_debug_chunks.strip():
return "❌ Ask a question first, then click Debug."
return last_debug_chunks
# -----------------------------
# Chat
# -----------------------------
def respond(user_message, history):
global last_debug_chunks
if history is None:
history = []
user_message = (user_message or "").strip()
if not user_message:
return history, ""
history.append({"role": "user", "content": user_message})
try:
_init_models()
if not is_index_ready:
history.append({"role": "assistant", "content": "Please upload a document and click **Ingest** first."})
return history, ""
retrieved = retrieve(user_message, TOP_K)
last_debug_chunks = "\n\n".join(
[f"[Chunk {r['chunk_id']} score={r['score']:.2f}]\n{r['chunk']}" for r in retrieved]
)
answer = generate_answer(user_message, retrieved)
history.append({"role": "assistant", "content": answer})
return history, ""
except Exception as e:
history.append({"role": "assistant", "content": f"❌ Error: {type(e).__name__}: {e}"})
return history, ""
def reset_index():
global faiss_index, doc_chunks, is_index_ready, last_ingested_text, last_debug_chunks
faiss_index = None
doc_chunks = []
is_index_ready = False
last_ingested_text = ""
last_debug_chunks = ""
return "🧹 Reset done."
# -----------------------------
# UI
# -----------------------------
with gr.Blocks(title=APP_TITLE) as demo:
gr.Markdown(
f"# πŸ€– {APP_TITLE}\n"
"Upload β†’ Ingest β†’ Preview extracted text β†’ Ask questions.\n\n"
"**If your PDF is scanned/image-based (like the EPA sample letter), turn ON Force OCR.**"
)
file_upload = gr.File(file_count="multiple", file_types=[".pdf", ".txt", ".md"], label="Upload PDF / TXT / MD")
with gr.Row():
force_ocr = gr.Checkbox(value=True, label="Force OCR (recommended for scanned PDFs)")
ocr_max_pages = gr.Slider(1, 30, value=10, step=1, label="OCR Max Pages")
ocr_dpi = gr.Slider(150, 350, value=250, step=10, label="OCR DPI")
with gr.Row():
ingest_btn = gr.Button("βœ… Ingest", variant="primary")
reset_btn = gr.Button("🧹 Reset")
ingest_status = gr.Textbox(label="Ingest Status", lines=10)
with gr.Row():
preview_btn = gr.Button("πŸ‘€ Preview Extracted Text")
debug_btn = gr.Button("πŸ§ͺ Debug: Show Retrieved Chunks")
preview_box = gr.Textbox(label="Extracted Text Preview (first 4000 chars)", lines=12)
debug_box = gr.Textbox(label="Retrieved Chunks for last question", lines=12)
gr.Markdown("## πŸ’¬ Chat")
chatbot = gr.Chatbot(label="RobotInsight Chat", height=360)
user_input = gr.Textbox(label="Ask a question", placeholder="Example: Who signed the letter?")
send_btn = gr.Button("Send")
ingest_btn.click(fn=ingest_files, inputs=[file_upload, force_ocr, ocr_max_pages, ocr_dpi], outputs=[ingest_status])
reset_btn.click(fn=reset_index, inputs=[], outputs=[ingest_status])
preview_btn.click(fn=preview_ingested_text, inputs=[], outputs=[preview_box])
debug_btn.click(fn=debug_chunks_used, inputs=[], outputs=[debug_box])
send_btn.click(fn=respond, inputs=[user_input, chatbot], outputs=[chatbot, user_input])
user_input.submit(fn=respond, inputs=[user_input, chatbot], outputs=[chatbot, user_input])
demo.launch()